import os
import threading
import time
def MyThread(x,y): #定义每个线程要执行的函数体
print("传递的数据:%s,%s"%(x,y)) #其中有两个参数,我们动态传入
time.sleep(5) #睡眠5秒钟
for x in range(10): #创建10个线程并发执行函数
thread = threading.Thread(target=MyThread,args=(x,x+1,)) #args是函数的参数,元组最后一个必须要逗号.
thread.start() #启动线程
使用类创建线程: 通过定义类,传递给类中一些参数,然后启动多线程,这种方式不常用.
import os
import threading
import time
class MyThread(threading.Thread): #继承threading.Thread类
def __init__(self,x,y): #重写构造函数
super(MyThread,self).__init__() #先执行父类的构造方法
self.x = x
self.y = y
def run(self): #run()方法,是cpu调度线程会使用的方法,名称必须是run
print("运行线程, X=%s Y=%s"%(self.x,self.y))
for i in range(10): #创建10个线程
obj = MyThread(i,i+10)
obj.start()
import paramiko,datetime,threading
class MyThread(threading.Thread):
def __init__(self,address,username,password,port,command):
super(MyThread, self).__init__()
self.address = address
self.username = username
self.password = password
self.port = port
self.command = command
def run(self):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(self.address, port=self.port, username=self.username, password=self.password, timeout=1)
stdin, stdout, stderr = ssh.exec_command(self.command)
result = stdout.read()
if not result:
self.result = stderr.read()
ssh.close()
self.result = result.decode()
except Exception:
self.result = "0"
def get_result(self):
try:
return self.result
except Exception:
return None
ThreadPool = [] # 定义线程池
starttime = datetime.datetime.now()
for item in range(5):
obj = MyThread("192.168.1.20","root","123","22","ifconfig")
ThreadPool.append(obj)
for item in ThreadPool:
item.start() # 启动线程
item.join()
for item in ThreadPool:
ret = item.get_result() # 获取返回结果
print(ret)
endtime = datetime.datetime.now()
print("程序开始运行:{} 结束:{}".format(starttime,endtime))
接收线程返回结果: 我们可以使用join方法,等待线程执行完毕后的返回结果.
import os
import threading
import time
def MyThread(x,y): #定义每个线程要执行的函数体
print("传递的数据:%s,%s"%(x,y)) #其中有两个参数,我们动态传入
time.sleep(5) #睡眠5秒钟
return "ok"
temp=[]
for x in range(10): #创建10个线程并发执行函数
thread = threading.Thread(target=MyThread,args=(x,x+1,)) #args是函数的参数,元组最后一个必须要逗号.
thread.start() #启动线程
temp.append(thread) #将线程结果添加到列表
for y in temp: #遍历这个线程列表
#此处一定要join,不然主线程比子线程跑的快,会拿不到结果,程序就退出执行了.
y.join() #等待线程执行完毕,返回结果
print("线程: %s"%y)
import time
import threading
num = 0 #定义全局共享变量
thread_list = [] #线程列表
lock = threading.Lock() #生成全局锁
def SumNumber():
global num #在每个线程中获取这个全局变量
time.sleep(2)
lock.acquire() #修改数据前给数据加锁
num += 1 #每次进行递增操作
lock.release() #执行完毕以后,解除锁定
for x in range(50): #指定生成线程数
thread = threading.Thread(target=SumNumber)
thread.start() #启动线程
thread_list.append(thread) #将结果列表加入到变量中
for y in thread_list: #等待执行完毕.
y.join()
print("计算结果: ",num)
递归锁(RLock): 递归锁和全局锁差不多,递归锁就是在大锁中还要添加个小锁,递归锁是常用的锁.
import threading
import time
num = 0 #初始化全局变量
lock = threading.RLock() #设置递归锁
def fun1():
lock.acquire() #添加递归锁
global num
num += 1
lock.release() #关闭递归锁
return num
def fun2():
lock.acquire() #添加递归锁
res = fun1()
print("计算结果: ",res)
lock.release() #关闭递归锁
if __name__ == "__main__":
for x in range(10): #生成10个线程
thread = threading.Thread(target=fun2)
thread.start()
while threading.active_count() != 1: #等待所有线程执行完成
print(threading.active_count())
else:
print("所有线程运行完成...")
print(num)
互斥锁(Semaphore): 同时允许一定数量的线程更改数据,也就是限制每次允许执行的线程数.
import threading,time
num = 0 #初始化变量
semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
def run(n):
semaphore.acquire() #添加信号
time.sleep(1)
print("运行这个线程中: %s"%n)
semaphore.release() #关闭信号
if __name__ == '__main__':
for i in range(20): #同时执行20个线程
t = threading.Thread(target=run, args=(i,))
t.start()
while threading.active_count() != 1: #等待所有线程执行完毕
pass
else:
print('----所有线程执行完毕了---')
print(num)
import threading
import time
def func():
print("hello python")
for i in range(5): #指定5个线程
thread = threading.Timer(5,func) #在5秒钟以后运行func函数
thread.start()
import multiprocessing
import time
def func(name):
time.sleep(2)
print("hello",name)
if __name__ == "__main__":
for i in range(5):
proc = multiprocessing.Process(target=func,args=("lyshark",))
proc.start()
proc.join()
创建进程(2): 创建5个进程,并在每个进程里启动1个线程,线程打印出线程的ID号.
import multiprocessing
import threading
import time
def thread_run():
print("子线程->子线程ID: %s" %threading.get_ident())
def func(num):
time.sleep(2)
print("-------------------------------->>> 主线程->主线程ID %s" %num)
for i in range(5): #在主线程里开辟5个子线程
thread = threading.Thread(target=thread_run,) #嵌套一个子线程
thread.start() #执行子线程
if __name__ == "__main__":
for i in range(5): #启动5个主线程
proc = multiprocessing.Process(target=func,args=(i,))
proc.start()
#proc.join()
from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
def foo(i,arg):
arg.put(i)
print('say hi',i,arg.qsize())
li = queues.Queue(20,ctx=multiprocessing)
for i in range(10):
p = Process(target=foo,args=(i,li,))
p.start()
import multiprocessing
import time
def foo(num):
time.sleep(2)
print("进程执行-->: %s"%num)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=5) #允许进程池同时放入5个进程
for i in range(10):
pool.apply(func=foo,args=(i,)) #并行执行每次执行一个
print("ends ...")
pool.close()
pool.join()
import multiprocessing
import time
def foo(num):
time.sleep(2)
print("进程执行-->: %s"%num)
def bar(arg):
print("call back 函数执行..")
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=5) #允许进程池同时放入5个进程
for i in range(10):
pool.apply_async(func=foo,args=(i,),callback=bar) #每次执行进程结束,自动执行callback指定的函数
print("ends ...")
pool.close()
pool.join()
import queue
import threading
import time
q = queue.Queue()
def productor(arg):
while True:
q.put(str(arg))
print("%s 号窗口有票...."%str(arg))
time.sleep(1)
def consumer(arg):
while True:
print("第 %s 人取 %s 号窗口票"%(str(arg),q.get()))
time.sleep(1)
for i in range(10): #负责生产票数
t = threading.Thread(target=productor,args=(i,))
t.start()
for j in range(5): #负责取票,两个用户取票
t = threading.Thread(target=consumer,args=(j,))
t1 = threading.Thread(target=consumer,args=(j,))
t.start()
t1.start()