用来调试嵌入式服务器连接数,写的简单python脚本,嵌入式python,欢迎大家给点意见,很少用


欢迎大家给点意见,很少用python的

#python test max connnect script#write by iilicshi#2012-2-12#coding=utf8import http.clientimport timeimport sysimport threadingdef tryConn(url):    try:        conn = http.client.HTTPConnection(url)        try:            conn.request("GET","/")            res = conn.getresponse()            context = conn.read()            return res.status        finally:            if conn is not None:                conn.close()    except Exception:        return 400def testConn(thread_id, url, loop_count):    global g_mutex    for i in range(loop_count):        g_mutex.acquire()        print(">>>>>>>>>now thread id is [" + str(thread_id) + "] doing request " + str(i) + ": start now")        g_mutex.release()         tryConn(url)        g_mutex.acquire()        print("<<<<<<<<<now thread id is [" + str(thread_id) + "] doing request " + str(i) + ": end")        g_mutex.release() def multTask(threadNum, url):    global g_mutex    loop_count = 100    if threadNum > 5000:        return "Too more thread!"    #init thread_pool    thread_pool = []    g_mutex = threading.Lock()    for i in range(threadNum):        newThread = threading.Thread(target=testConn, args=(i, url, loop_count))        thread_pool.append(newThread)    for i in range(threadNum):        thread_pool[i].start()    #用消息通知的方法比以下代码的有意义,类似于‘心跳’.    #目前缺杀死线程的处理    for i in range(threadNum):        threading.Thread.join(thread_pool[i])if __name__ == "__main__":    url = input("Please input the url what u want test: ")    threadNum = int(input("thread num: "))    multTask(threadNum, url)

评论关闭