Python - Multithreading
When to use which
see [1]
use threading if your program is I/O/network limited
use multiprocessing if it's CPU limited. (here the entire memory is copied into each subprocess)
Example for both
import multiprocessing
import threading
import queue
import os
import time
import math
def worker_core(i: int, s: str) -> list:
"""
used in worker_core_cpu and worker_core_io
"""
result = i, s, os.getpid()
time.sleep(.1)
return result
def worker_core_cpu(i: int, s: str) -> list:
"""
used in all CPU limited worker implementations
"""
sum = 0
for j in range(1_000):
sum += math.sqrt(i+j)
result = worker_core(i=i, s=s)
return result
def worker_core_io(i: int, s: str) -> list:
"""
used in all I/O limited worker implementations
"""
time.sleep(.1)
result = worker_core(i=i, s=s)
return result
def worker_multiprocessing_1(i: int, s: str) -> list:
" used in multiprocessing_1 "
return worker_core_cpu(i=i, s=s)
def worker_multiprocessing_2(q_work: multiprocessing.Queue, q_results: multiprocessing.Queue):
" used in multiprocessing_2 -> not faster than multiprocessing_1"
while not q_work.empty():
i, s = q_work.get()
result = worker_core_cpu(i=i, s=s)
q_results.put(result)
def worker_threading_1(q_work: queue.Queue, results: dict):
"used in threading_1"
while not q_work.empty():
i, s = q_work.get()
result = worker_core_io(i=i, s=s)
results[i] = result
q_work.task_done()
def multiprocessing_1(l_pile_of_work: list):
"""
for CPU limited tasks
"""
num_processes = min(multiprocessing.cpu_count()-1, len(l_pile_of_work))
pool = multiprocessing.Pool(processes=num_processes)
# use map for 1 argument, use starmap for multiple arguments for worker function
# here each item in l_pile_of_work is a list of 3 elements -> starmap
l_results_unsorted = pool.starmap(
func=worker_multiprocessing_1,
iterable=l_pile_of_work)
l_results = sorted(l_results_unsorted) # sort by id
return l_results
def multiprocessing_2(l_pile_of_work: list):
"""
for CPU limited tasks
not using not re-creation of processes, but fixed number instead
result: slower than multiprocessing_1
"""
q_pile_of_work = multiprocessing.Queue(maxsize=len(l_pile_of_work))
q_results = multiprocessing.Queue(maxsize=len(l_pile_of_work))
for params in l_pile_of_work:
q_pile_of_work.put(params)
del params
num_processes = min(multiprocessing.cpu_count()-1, len(l_pile_of_work))
l_processes = []
for i in range(num_processes):
p = multiprocessing.Process(
name='myProcess-'+str(i),
target=worker_multiprocessing_2,
args=(q_pile_of_work, q_results),
daemon=True)
l_processes.append(p)
del i
for p in l_processes:
p.start()
print("started")
while not q_results.full():
time.sleep(.1)
for p in l_processes:
p.terminate()
p.join()
p.close()
print("closed")
del p
l_results_unsorted = []
while not q_results.empty():
result = q_results.get()
l_results_unsorted.append(result)
l_results = sorted(l_results_unsorted) # sort by id
return l_results
def threading_1(l_pile_of_work: list, num_threads: int):
"""
for I/O limited tasks
"""
q_pile_of_work = queue.Queue(
maxsize=len(l_pile_of_work)) # maxsize=0 -> unlimited
for params in l_pile_of_work:
q_pile_of_work.put(params)
d_results = {} # threads can write into dict
l_threads = [] # List of threads, not used here
for i in range(num_threads):
t = threading.Thread(name='myThread-'+str(i),
target=worker_threading_1,
args=(q_pile_of_work, d_results),
daemon=True)
l_threads.append(t)
t.start()
q_pile_of_work.join() # wait for all threas to complete
l_results_unsorted = d_results.values()
l_results = sorted(l_results_unsorted) # sort by id
return l_results
if __name__ == '__main__':
l_pile_of_work = []
loops = 1_000
for i in range(loops):
# use index as first parameter
L2 = (i, "n"+str(i))
l_pile_of_work.append((L2))
del L2, i
time_start = time.time()
# results = multiprocessing_1(l_pile_of_work)
# # or
# results = multiprocessing_2(l_pile_of_work)
# # or
results = threading_1(l_pile_of_work, num_threads=100)
duration = time.time() - time_start
print("%d sec = %.1f min" % (duration, duration/60))
# for res in results:
# print('task %d, name %s was done in process %d' % (res))
# print(len(results))
Threading V2: 18.04.2020
see [2]
import multiprocessing as mp # for fetching number of CPUs
import threading
import concurrent.futures
def series_of_fits_multi_threading(data: list, fit_range: int = 7, max_days_past=14) -> list:
fit_series_res = {}
l_last_days_for_fit = range(0, -max_days_past, -1)
with concurrent.futures.ThreadPoolExecutor(max_workers=mp.cpu_count()) as executor:
# Start the load operations and mark each future with its data set
list_future = {executor.submit(
series_of_fits_worker_thread, data, fit_range, last_day_for_fit): last_day_for_fit for last_day_for_fit in l_last_days_for_fit}
for future in concurrent.futures.as_completed(list_future):
last_day_for_fit = list_future[future]
d_this_fit_result = {}
try:
d_this_fit_result = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (day, exc))
if len(d_this_fit_result) != 0:
fit_series_res[last_day_for_fit] = round(
d_this_fit_result['fit_res'][1], 1)
return fit_series_res
def series_of_fits_worker_thread(data: list, fit_range: int, last_day_for_fit: int):
# print(threading.currentThread().getName(), 'Starting')
d = fit_routine(
data, (last_day_for_fit-fit_range, last_day_for_fit))
return d
V1
import threading
import logging
import time
import multiprocessing as mp # for fetching number of CPUs
# Python’s built-in data structures (lists, dictionaries, etc.) are thread-safe
# as a side-effect of having atomic byte-codes for manipulating them
# -> so locking is them is not required.
# Other data structures implemented in Python, or simpler types like integers and floats, don’t have that protection.
# from inside a thread it is possible to manipulate dic but not integers
# Logging is nicer than print, as it can automatically add the threadname
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(threadName)s: %(message)s',
)
# List of "work" to to
listPileOfWork = [x for x in range(7)]
# Threads can not write in integers, but in dictionaries !
# counter = 0
dic = {}
dic["Counter"] = 0
def worker():
logging.info('Starting')
# print(threading.currentThread().getName(), 'Starting')
while listPileOfWork :
x = listPileOfWork.pop(0)
logging.info("Took" + str(x))
dic["Counter"] += x
time.sleep(2)
logging.info('No more work to do')
threads = [] # List of threads
for i in range(mp.cpu_count()):
t = threading.Thread(name='myThread'+str(i), target=worker)
threads.append(t)
t.start()
# Wait for all threads to complete
for t in threads:
t.join()
logging.info('All threads done')
Multiprocessing
see [3]
V2
import multiprocessing
from os import getpid
import time
def worker(i, s):
print(i)
# print('I am number %d, %s in process %d' % (i, str, getpid()))
time.sleep(.1)
return i, s, getpid()
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
L1 = []
for i in range(100):
L2 = (i, "n"+str(i))
L1.append(L2)
# map for 1 argument, starmap for multiple arguments at worker
results_unsorted = pool.starmap(worker, L1)
results = sorted(results_unsorted)
for res in results:
print('task %d, name %s was done in process %d' % (res))
V1
From [4]: The threading module uses threads, the multiprocessing module uses processes. The difference is that threads run in the same memory space, while processes have separate memory. This makes it a bit harder to share objects between processes with multiprocessing. Since threads use the same memory, precautions have to be taken or two threads will write to the same memory at the same time. This is what the global interpreter lock is for. Spawning processes is a bit slower than spawning threads. Once they are running, there is not much difference.
Example from [5]
Prob: KeyboardInterrupt (CTRL+C) does not close program completely see [6] as starting point for a solution
from multiprocessing import Pool
def escapes(cr, ci, it):
"""Does iterating z <- z^2 + c escape after it iterations?"""
zr = 0.0
zi = 0.0
for i in xrange(it):
# z <- z^2 + c
zr,zi = zr*zr - zi*zi + cr, 2*zr*zi + ci
if zr*zr + zi*zi > 4:
return True
return False
def toChar(p):
if p: return " "
else: return "X"
def doRow((xmin,xmax,xstep, ymin,ymax,ystep, iterations, yc)):
"""Calculate one row of the output."""
y = yc*(ymax-ymin)/ystep + ymin
row = []
for xc in xrange(xstep):
x = xc*(xmax-xmin)/xstep + xmin
row.append( escapes(x, y, iterations) )
return "".join([toChar(p) for p in row])
def mandel(xmin,xmax,xstep, ymin,ymax,ystep, iterations):
"""Calculate and print a Mandelbrot set."""
pool = Pool() # askes the os for num of cpus ;-)
results = []
for yc in xrange(ystep):
res = pool.apply_async(doRow, ((xmin,xmax,xstep, ymin,ymax,ystep, iterations, yc),))
results.append(res)
for yc in xrange(ystep):
print results[yc].get()
if __name__=="__main__":
mandel(-2.0, 1.0, 80, -1.0, 1.0, 24, 20000)