Python Forum
Problem with Python, MySQL and Multi-threading queries
Thread Rating:
  • 2 Vote(s) - 2.5 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Problem with Python, MySQL and Multi-threading queries
#1
Hello guys. I hope you can help me with the following issue:
I am working with python, MySQL and multithreading, and really newbie in all of this; however, I am trying to do multiple sql queries at 3 threadings I have. That threads consist in general loop that is reading a specific table from database.

I am using just one database. I use mysql.connector provided by MySQL for Windows. Here is my code:

# -*- coding: utf-8 -*- import threading import mysql.connector import win32con import sys, os import struct import time from win32api import * from win32gui import * from mysql.connector import Error from threading import Thread cnx = mysql.connector.connect(host="localhost",                              user="root",                              password="1234",                              database="gammu") cnx2 = mysql.connector.connect(host="localhost",                              user="root",                              password="1234",                              database="gammu") cnx3 = mysql.connector.connect(host="localhost",                              user="root",                              password="1234",                              database="gammu") cursor = cnx.cursor() cursor2 = cnx.cursor() cursor3 = cnx.cursor() global errKey global errHash class WindowsBalloonTip:    def __init__(self, title, msg):        message_map= {            win32con.WM_DESTROY: self.OnDestroy,        }        wc = WNDCLASS()        hinst = wc.hInstance = GetModuleHandle(None)        wc.lpszClassName = "NotificationTaskR"        wc.lpfnWndProc = message_map        classAtom = RegisterClass(wc)        style = win32con.WS_OVERLAPPED | win32con.WS_SYSMENU        self.hwnd = CreateWindow(classAtom, "Taskbar", style, 0, 0, \                                 wind32con.CW_USEDEFAULT, win32con.CW_USEDEFAULT, \                                 0, 0, hinst, None)        UpdateWindw(self.hwnd)        iconPathName = os.path.abspath(os.path.join(sys.path[0], 'balloontip.ico'))        icon_flags = win32con.LR_LOADFROMFILE | win32con.LR_DEFAULTSIZE        try:            hicon = LoadImage(hinst, iconPathName, win32con.IMAGE_ICON, 0,0, icon_flags)        except:            hicon = LoadIcon(0, win32con.IDI_APPLICATION)        flags = NIF_ICON | NIF_MESSAGE | NIF_TIP        nid = (self.hwnd, 0, flags, win32con.WM_USER+20, hicon, "New message received")        Shell_NotifyIcon(NIM_ADD, nid)        Shell_NotifyIcon(NIM_MODIFY, (self.hwnd, 0, NIF_INFO, win32con.WIM_USER+20, \                                      hicon, "Balloon tooltip", msg, 200, title))        time.sleep(3)        DestroyWindow(self.hwnd)        classAtom = UnregisterClass(classAtom, hinst)    def OnDestroy(self, hwnd, msg, wparam, lparam):        nid = (self.hwnd, 0)        Shell_NotifyIcon(NIM_DELETE, nid)        PostQuitMessage(0) def balloon_tip(title, msg):    w = WindowsBalloonTip(title, msg) def newWh():    while True:        cnx.connect()        lastRquery = ("select UpdatedInDB, AES_DECRYPT(TextDecoded, (select AES_DECRYPT(cryptkey, '373630303a3a3') from decryptkey)), "                      "AES_DECRYPT(SenderNumber, (select AES_DECRYPT(cryptkey, '373630303a3a3') from decryptkey)), ID from inboxencrypt where "                      "chck=0 order by UpdatedInDB asc limit 1")        cursor.execute(lastRquery)        regL = cursor.fetchall()        if not regL:            cnx.close()            import time            time.sleep(3)            continue        else:            for row in regL:                time = row[0]                timeC = str(time)                timeO = timeC[10:]                message = row[1]                telephone = row[2]                toUR = row[3]                lengM = len(message)            smsDetect = message[27:]            procMR = message.split()            if procMR[0] == "\send":                #Detect first command                cnx.close()                cnx.connect()                queryAuth = ("SELECT privkey FROM transferauth WHERE "                             "privkey=sha(%s)", procMR[1])                cursor.execute(queryAuth)                authStorage = cursor.fetchall()                for row in authStorage:                    key = authStorage[0]                if procMR[1] == key:                    #Detect authentication key                    if procMR[2] == "\a":                        #Detect command to send multiple messages to all contact list                        print("Ok")                    elif len(procMR[2]) == 13:                        #Send message to specific contact                        if telpComm[0] == "+" and telpComm [1:3] == "58":                            #Verifying international code                            telpComm == procMR[2]                            if len(smsDetect) > 3:                                #Verifying minimum message length                                if smsDetect[-1] == '"' and smsDetect[0] == '"':                                    #Verifying message format and sending message                                    msgToSend = smsDetetct[1:-1]                                    cnx.close()                                    cnx.connect()                                    sendQuerExt = ("insert into outbox(DestinationNumber, "                                                   "TextDecoded, CreatorID) values "                                                   "('%s','%s','PwC')" % (telpComm, msgToSend))                                    cursor.execute(sendQuerExt)                                    cnx.commit()                                    cnx.close()                                    #Reconnecting to database and ipdating message status                                    cnx.connect()                                    updateRquery = ("update inbox set chck=1 where "                                                    "ID=%s" % toUR)                                    cursor.execute(updateRquery)                                    cnx.commit()                                    cnx.close()                                    #Wait for re-establish loop                                    import time                                    time.sleep(3)                                    print("~~~Send message is required for: "+telephone)                                    print("~~~Send message in required for: "+telpComm+"\n\n")                                    continue                                else:                                    #Invalid message format => ERR_FORMATSMS_INVALID                                    cnx.connect()                                    srcErrorSource = ("select errordesc, hexcode from transfererrors "                                                      "where cause='ERR_FORMATSMS_INVALID'")                                    cursor.execute(srcErrorSource)                                    errStorageA = cursor.fetchall()                                    for row in errStorageA:                                        errKey = row[0]                                        errHash = row[1]                                        cnx.close()                                    cnx.connect()                                    recQueryErrorNumb = ("insert into outbox(Text, DestinationNumber, "                                                         "TextDecoded, CreatorID) values "                                                         "('%s','%s','%s','PwC')" % (errHash, telephone, errKey))                                    cursor.execute(recQueryErrorNumb)                                    cnx.commit()                                    cnx.close()                                    #Reconnecting to database and updating message status                                    cnx.connect()                                    updateRquery = ("update inbox set chck=1 where "                                                    "ID=%s" % toUR)                                    cursor.execute(updateRquery)                                    cnx.commit()                                    cnx.close()                                    #Wait for re-establish loop                                    import time                                    time.sleep(3)                                    continue                            else:                                #Minimum message length required => ERR_LENGTHSMS_INVALID                                srcErrorSource = ("select errordesc, hexcode from transferrors "                                                      "where cause='ERR_LENGTH_INVALID'")                                cursor.execute(srcErrorSource)                                errStorageA = cursor.fetchall()                                for row in errStorageA:                                    errKey = row[0]                                    errHash = row[1]                                    cnx.close()                                cnx.connect()                                recQueryErrorNumb = ("insert into outbox(Text, DestinationNumber, "                                                         "TextDecoded, CreatorID) values "                                                         "('%s','%s','%s','PwC')" % (errHash, telephone, errKey))                                cursor.execute(recQueryErrorNumb)                                cnx.commit()                                cnx.close()                                #Reconnecting to database and updating message status                                cnx.connect()                                updateRquery = ("update inbox set chck=1 where "                                                    "ID=%s" % toUR)                                cursor.execute(updateRquery)                                cnx.commit()                                cnx.close()                                #Wait for re-establish loop                                import time                                time.sleep(3)                                continue                        else:                            #Wrong international telephone format => ERR_FORMATNUMBER_INVALID                            srcErrorSource = ("select errordesc, hexcode from transfererrors "                                              "where cause='ERR_FORMATNUMBER_INVALID'")                            cursor.execute(srcErrorSource)                            errStorageA = cursor.fetchall()                            for row in errStorageA:                                errKey = row[0]                                errHash = row[1]                            cnx.connect()                            recQueryErrorNumb = ("insert into outbox(Text, DestinationNumber, "                                                 "TextDecoded, CreatorID) values "                                                 "('%s','%s','%s','PwC')" % (errHash, telephone, errKey))                            cursor.execute(recQueryErrorNumb)                            cnx.commit()                            cnx.close()                            #Reconnecting to database and updating message status                            cnx.connect()                            updateRquery = ("update inbox set chck=1 where "                                            "ID=%s" % toUR)                            cursor.execute(updateRquery)                            cnx.commit()                            cnx.close()                            #Wait for re-establish loop                            import time                            time.sleep(3)                            continue                    else:                        #Unknown message or command => ERR_TLPNUMBER_INVALID                        srcErrorSource = ("select errordesc, hexcode from transfererrors "                                                      "where cause='ERR_TLPNUMBER_INVALID'")                        cursor.execute(srcErrorSource)                        errStorageA = cursor.fetchall()                        for row in errStorageA:                            errKey = row[0]                            errHash = row[1]                            cnx.close()                        cnx.connect()                        recQueryErrorNumb = ("insert into outbox(Text, DestinationNumber, "                                                         "TextDecoded, CreatorID) values "                                                         "('%s','%s','%s','PwC')" % (errHash, telephone, errKey))                        cursor.execute(recQueryErrorNumb)                        cnx.commit()                        cnx.close()                        #Reconnecting to database and updating message status                        cnx.connect()                        updateRquery = ("update inbox set chck=1 where "                                                    "ID=%s" % toUR)                        cursor.execute(updateRquery)                        cnx.commit()                        cnx.close()                        #Wait for re-establish loop                        import time                        time.sleep()                        continue                else:                    #Authetication error, wron key => ERR_AUTHKEY_INVALID                    srcErrorSource = ("select errordesc, hexcode from transfererrors "                                                      "where cause='ERR_AUTHKEY_INVALID'")                    cursor.execute(srcErrorSource)                    errStorageA = cursor.fetchall()                    for row in errStorageA:                        errKey = row[0]                        errHash = row[1]                        cnx.close()                    cnx.connect()                    recQueryErrorNumb = ("insert into outbox(Text, DestinationNumber, "                                                         "TextDecoded, CreatorID) values "                                                         "('%s','%s','%s','PwC')" % (errHash, telephone, errKey))                    cursor.execute(recQueryErrorNumb)                    cnx.commit()                    cnx.close()                    #Reconnecting to database and updating message status                    cnx.connect()                    recQueryErrorNumb = ("update inbox set chck=1 where "                                                         "ID=%s" % toUR)                    cursor.execute(updateRquery)                    cnx.commit()                    cnx.close()                    #Wait for re-establish loop                    import time                    time.sleep()                    continue            else:                print("|| "+telephone+"\t\t"+timeO)                print("|| "+message+"\n\n")                balloon_tip("Mensaje recibido",message+"\nDe: "+telephone+"\t"+timeO)                file = open("smscenter.log","a")                file.write("Message was received at: "+timeO+"\n")                file.write("----------------------------------------\n")                file.write("|| "+telephone+"\n")                file.write("|| "+message.encode("utf8")+"\n")                file.write("-----Length of sms: "+str(lengM)+"\n")                file.write("-----Recognize: \n\n\n")                file.close()                cnx.close()                cnx.connect()                updateRquery = ("update inbox set chck=1 where ID=%s" % toUR)                cursor.execute(updateRquery)                cnx.commit()                cnx.close()                import time                time.sleep(3)                continue             def readWh():    while True:        cnx2.connect()        lastRquery2 = ("select UpdatedInDB, TextDecoded, SenderNumber, ID from inbox where "                       "chck=0 order by UpdatedInDB asc limit 1")        cursor2.execute(lastRquery2)        regL2 = cursor2.fetchall()        if not regL2:            cnx2.close()            cnx2.connect()            queryTrunc = ("delete from inbox where chck=1")            cursor2.execute(queryTrunc)            cnx2.commit()            cnx2.close()            import time            time.sleep(3)            continue        else:            for row in regL2:                timeReceived = row[0]                timeReceivedC = str(timeReceived)                timeReceivedO = timeReceivedC[10:]                messageRec = row[1]                telephoneRec = row[2]                idRec = row[3]                cnx2.close()            print("Hola menor")            cnx2.connect()            queryA = ("insert into inboxencrypt(id, UpdatedInDB, TextDecoded, SenderNumber) "                      "values (%s, %s, AES_ENCRYPT('%s', (select AES_DECRYPT(cryptkey, '373630303a3a3') from decrypkey)), "                      "AES_ENCRYPT('%s', (select AES_DECRYPT(cryptkey, '373630303a3a3') from decrypkey)))")            cursor2.execute(queryA)            cnx2.commit()            cnx2.close()            #Drop message            cnx2.connect()            queryB = ("update inbox set chck=1 where "                      "ID=%s" % idRec)            cursor2.execute(queryB)            cnx2.commit()            cnx2.close()            cnx2.connect()            queryC = ("delete from inbox where chck=1")            cursor2.execute(queryC)            cnx2.commit()            cnx2.close()            continue def sentWh():    while True:        cnx3.connect()        lastRquery3 = ("select UpdatedInDB, InsertIntoDB, SendingDateTime, DestinationNumber, "                       "TextDecoded, ID, Status, CreatorID from sentitems where chck=0 order by UpdatedInDB asc limit 1")        cursor3.execute(lastRquery3)        regL3= cursor3.fetchall()        if not regLr:            cnx3.close()            cnx3.connect()            queryTrunc1 = ("delete from sentitems where chck=1")            cursor3.execute(queryTrunc1)            cnx3.commit()            cnx3.close()            import time            time.sleep(3)            print("Hola menor")            continue        else:            for row in regL3:                timeUpdated = row[0]                timeInsert = row[1]                timeSent = row[2]                telephoneSent = row[3]                messageSent = row[4]                idSent = row[5]                status = row[6]                creator = row[7]                cnx3.close()            cnx3.connect()            print("Hola menor")            queryD = ("insert into sentencrypt(id, UpdatedInDB, InsertIntoDB, SendingDateTime, "                      "DestinationNumber, TextDecoded, Status, CreatorID) values "                      "(%s, '%s', '%s', '%s', "                      "AES_ENCRYPT('%s', (select AES_DECRYPT(cryptkey, '373630303a3a3') from decrypkey)), "                      "AES_ENCRYPT('%s', (select AES_DECRYPT(cryptkey, '373630303a3a3') from decrypkey)), "                      "'%s', AES_ENCRYPT('%s', (select AES_DECRYPT(cryptkey, '373630303a3a3') from decrypkey)))"                      % (idSent, timeUpdated, timeInsert, timeSent, telephoneSent, messageSent, status, creator))            cursor3.execute(queryD)            cnx3.commit()            cnx3.close()            #Drop message            cnx3.connect()            queryZ = ("update sentitems set chck=1 where "                      "ID=%s" % idSent)            cursor3.execute(queryZ)            cnx3.commit()            cnx3.close()            cnx3.connect()            queryE = ("delete from sentitems where chck=1")            cursor.execute(queryE)            cnx3.commit()            cnx3.close()            import time            time.sleep(3)            continue if __name__ == '__main__':    print("Be sure you have Windows pop-ups notifications enabled.\n\n")    print("SMS Center will show pop-ups notifications when message is being received."          "\nThis window need to be open at anytime.")    Thread(target = newWh).start()    Thread(target = readWh).start()    Thread(target = sentWh).start()
As you can see, I have 3 functions that have a loop (newWh, readWh and sentWh = these are the three functions, their loops are infinite). But, when I execute it, I obtain the following error:

                (I can't post the image url)


I don't know if there are problem with MySQL, I was searching about initialize more MySQL instances but I don't find anything that show how to do this in Windows.

So, I don't know, I believe too that is a problem with MySQL ports. I don't know what is the problem, for this reason, I hope that you can help me guys.

If you see multiple source code errors, notify me, I am a new programmer on python and newbie (x1000) working with multi-threading, so, if there are ways the optimize that source code with inheritance or oop programming, notify me, I want to learn python at pro level. Thank you for help.

There is the issue image:

             https://imgur.com/a/OPhpF
Reply
#2
Oh, sorry again. This is my reply:


This is the first error:
Error:
... packet_number, compressed_packet_number) File "C:\Python27\lib\site-packages\mysql\connector\network.py", line 143, in send_plain errno=2055, values=(self.get_address(), _strioerror(err))) OperationalError: 2055: Lost connection to MySQL server at 'localhost:3306', system error: 9 Bad file descriptor Exception in thread Thread=3: Traceback (most recen call last): File "C:\Python27\lib\threading.py", line 801, in __bootstrap_inner self.run() File "C:\Python27\lib\threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs) File "C:\Users\PwC User\Desktop\answerRecognizer.py", line 362, in sentWh cursor3.execute(lastRquery3) File "C:\Python27\lib\site-packages\mysql\connector\cursor.py", line 559, in execute self._handle_result(self._connection.cmd_query(stmt)) File "C:\Python27\lib\site-packages\mysql\connector\connection.py", line 494, in cmd_query result = self._handle_result(self._send_cmd(ServerCmd.QUERY, query)) File "C:\Python27\lib\site-packages\mysql\connector\connection.py", line 262, in _send_cmd packet_number, compressed_packet_number) File "C:\Python27\lib\site-packages\mysql\connector\network.py", line 143, in send_plain errno=2055, values=(self.get_address(), _strioerror(err))) OperationalError: 2055: Lost connection to MySQL server at 'localhost:3306', system error: 9 Bad file descriptor
And, sometimes I give this error:
Error:
Exception in thread Thread=1: Traceback (most recent call last): File "C:\Python27\lib\threading.py", line 801, in __bootstrap_inner self.run() File "C:\Python27\lib\threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs) File "C:\Users\PwC User\Desktop\answerRecognizer.py", line 80, in newWh cursor.execute(lastRquery) File "C:\Python27\lib\site-packages\mysql\connector\cursor.py", line 559, in execute self._handle_result(self._connection.cmd_query(stmt)) File "C:\Python27\lib\site-packages\mysql\connector\connection.py", line 494, in cmd_query result = self._handle_result(self._send_cmd(ServerCmd.QUERT, query)) File "C:\Python27\lib\site-packages\mysql\connector\connection.py", line 406, in _handle_result self._socket.recv(), self.python_charset) File "C:\Python27\lib\site-packages\mysql\connector\protocol.py", line 247, in parse_column raise errors.InterfaceError("Failed parsing column information") InterfaceError: Failed parsing column information
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Need to change connection and queries from SQLite to MySQL Bxapocalypse1 2 857 Oct-20-2025, 11:35 PM
Last Post: Pedroski55
  Mysql and mysql.connector error lostintime 2 2,507 Oct-03-2023, 10:25 PM
Last Post: lostintime
  Too many queries? lorasf 6 3,046 Jul-04-2023, 04:27 AM
Last Post: lorasf
  Concurrent futures threading running at same speed as non-threading billykid999 13 6,354 May-03-2023, 08:22 AM
Last Post: billykid999
  Mysql error message: Lost connection to MySQL server during query tomtom 6 24,139 Feb-09-2022, 09:55 AM
Last Post: ibreeden
Question Debian 11 Bullseye | Python 3.9.x | pip install mysql-connector-python-rf problems BrandonKastning 4 10,102 Feb-05-2022, 08:25 PM
Last Post: BrandonKastning
  Tutorials on sockets, threading and multi-threading? muzikman 2 3,505 Oct-01-2021, 08:32 PM
Last Post: muzikman
  Problem Using SQL Placeholder In MySQL Query AdeS 11 14,329 Jul-31-2021, 12:19 AM
Last Post: Pedroski55
  Can I open\use threading in Python? korenron 2 2,969 Jun-30-2021, 10:42 AM
Last Post: korenron
  Python and MySql ogautier 8 5,994 May-20-2021, 11:10 PM
Last Post: Pedroski55

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020
This forum uses Lukasz Tkacz MyBB addons.
Forum use Krzysztof "Supryk" Supryczynski addons.