Python - Multithreading

From Torben's Wiki
(Redirected from Python - Threads)

When to use which

see [1]
use threading if your program is I/O/network limited
use multiprocessing if it's CPU limited. (here the entire memory is copied into each subprocess)

Example for both

import multiprocessing
import threading
import queue
import os
import time
import math

def worker_core(i: int, s: str) -> list:
    """
    used in worker_core_cpu and worker_core_io
    """
    result = i, s, os.getpid()
    time.sleep(.1)
    return result

def worker_core_cpu(i: int, s: str) -> list:
    """
    used in all CPU limited worker implementations
    """
    sum = 0
    for j in range(1_000):
        sum += math.sqrt(i+j)
    result = worker_core(i=i, s=s)
    return result

def worker_core_io(i: int, s: str) -> list:
    """
    used in all I/O limited worker implementations
    """
    time.sleep(.1)
    result = worker_core(i=i, s=s)
    return result

def worker_multiprocessing_1(i: int, s: str) -> list:
    " used in multiprocessing_1 "
    return worker_core_cpu(i=i, s=s)

def worker_multiprocessing_2(q_work: multiprocessing.Queue, q_results: multiprocessing.Queue):
    " used in multiprocessing_2 -> not faster than multiprocessing_1"
    while not q_work.empty():
        i, s = q_work.get()
        result = worker_core_cpu(i=i, s=s)
        q_results.put(result)

def worker_threading_1(q_work: queue.Queue, results: dict):
    "used in threading_1"
    while not q_work.empty():
        i, s = q_work.get()
        result = worker_core_io(i=i, s=s)
        results[i] = result
        q_work.task_done()

def multiprocessing_1(l_pile_of_work: list):
    """
    for CPU limited tasks
    """
    num_processes = min(multiprocessing.cpu_count()-1, len(l_pile_of_work))
    pool = multiprocessing.Pool(processes=num_processes)
    # use map for 1 argument, use starmap for multiple arguments for worker function
    # here each item in l_pile_of_work is a list of 3 elements -> starmap
    l_results_unsorted = pool.starmap(
        func=worker_multiprocessing_1,
        iterable=l_pile_of_work)
    l_results = sorted(l_results_unsorted)  # sort by id
    return l_results

def multiprocessing_2(l_pile_of_work: list):
    """
    for CPU limited tasks
    not using not re-creation of processes, but fixed number instead
    result: slower than multiprocessing_1
    """
    q_pile_of_work = multiprocessing.Queue(maxsize=len(l_pile_of_work))
    q_results = multiprocessing.Queue(maxsize=len(l_pile_of_work))

    for params in l_pile_of_work:
        q_pile_of_work.put(params)
    del params

    num_processes = min(multiprocessing.cpu_count()-1, len(l_pile_of_work))
    l_processes = []
    for i in range(num_processes):
        p = multiprocessing.Process(
            name='myProcess-'+str(i),
            target=worker_multiprocessing_2,
            args=(q_pile_of_work, q_results),
            daemon=True)
        l_processes.append(p)
    del i
    for p in l_processes:
        p.start()
    print("started")

    while not q_results.full():
        time.sleep(.1)
    for p in l_processes:
        p.terminate()
        p.join()
        p.close()
    print("closed")
    del p

    l_results_unsorted = []
    while not q_results.empty():
        result = q_results.get()
        l_results_unsorted.append(result)
    l_results = sorted(l_results_unsorted)  # sort by id
    return l_results

def threading_1(l_pile_of_work: list, num_threads: int):
    """
    for I/O limited tasks
    """
    q_pile_of_work = queue.Queue(
        maxsize=len(l_pile_of_work))  # maxsize=0 -> unlimited
    for params in l_pile_of_work:
        q_pile_of_work.put(params)
    d_results = {}  # threads can write into dict

    l_threads = []  # List of threads, not used here
    for i in range(num_threads):
        t = threading.Thread(name='myThread-'+str(i),
                             target=worker_threading_1,
                             args=(q_pile_of_work, d_results),
                             daemon=True)
        l_threads.append(t)
        t.start()
    q_pile_of_work.join()  # wait for all threas to complete
    l_results_unsorted = d_results.values()
    l_results = sorted(l_results_unsorted)  # sort by id
    return l_results

if __name__ == '__main__':
    l_pile_of_work = []
    loops = 1_000
    for i in range(loops):
        # use index as first parameter
        L2 = (i, "n"+str(i))
        l_pile_of_work.append((L2))
    del L2, i
    time_start = time.time()
    # results = multiprocessing_1(l_pile_of_work)
    # # or
    # results = multiprocessing_2(l_pile_of_work)
    # # or
    results = threading_1(l_pile_of_work, num_threads=100)
    duration = time.time() - time_start
    print("%d sec = %.1f min" % (duration, duration/60))

    # for res in results:
    #     print('task %d, name %s was done in process %d' % (res))
    # print(len(results))

Threading V2: 18.04.2020

see [2]

import multiprocessing as mp  # for fetching number of CPUs
import threading
import concurrent.futures

def series_of_fits_multi_threading(data: list, fit_range: int = 7, max_days_past=14) -> list:
   fit_series_res = {}
   l_last_days_for_fit = range(0, -max_days_past, -1)
   with concurrent.futures.ThreadPoolExecutor(max_workers=mp.cpu_count()) as executor:
       # Start the load operations and mark each future with its data set
       list_future = {executor.submit(
           series_of_fits_worker_thread, data, fit_range, last_day_for_fit): last_day_for_fit for last_day_for_fit in l_last_days_for_fit}
       for future in concurrent.futures.as_completed(list_future):
           last_day_for_fit = list_future[future]
           d_this_fit_result = {}
           try:
               d_this_fit_result = future.result()
           except Exception as exc:
               print('%r generated an exception: %s' % (day, exc))
           if len(d_this_fit_result) != 0:
               fit_series_res[last_day_for_fit] = round(
                   d_this_fit_result['fit_res'][1], 1)
   return fit_series_res

def series_of_fits_worker_thread(data: list, fit_range: int, last_day_for_fit: int):
   # print(threading.currentThread().getName(), 'Starting')
   d = fit_routine(
       data, (last_day_for_fit-fit_range, last_day_for_fit))
   return d


V1

import threading
import logging
import time
import multiprocessing as mp # for fetching number of CPUs

# Python’s built-in data structures (lists, dictionaries, etc.) are thread-safe
#  as a side-effect of having atomic byte-codes for manipulating them
# -> so locking is them is not required.
# Other data structures implemented in Python, or simpler types like integers and floats, don’t have that protection.
# from inside a thread it is possible to manipulate dic but not integers

# Logging is nicer than print, as it can automatically add the threadname
logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(levelname)s %(threadName)s: %(message)s',
                    )

# List of "work" to to
listPileOfWork = [x for x in range(7)]

# Threads can not write in integers, but in dictionaries !
# counter = 0
dic = {}
dic["Counter"] = 0

def worker():
    logging.info('Starting')
    # print(threading.currentThread().getName(), 'Starting')
    while listPileOfWork :
        x = listPileOfWork.pop(0)
        logging.info("Took" + str(x))
        dic["Counter"] += x
        time.sleep(2)
    logging.info('No more work to do')


threads = []  # List of threads

for i in range(mp.cpu_count()):
    t = threading.Thread(name='myThread'+str(i), target=worker)
    threads.append(t)
    t.start()

# Wait for all threads to complete
for t in threads:
    t.join()
logging.info('All threads done')

Multiprocessing

see [3]

V2

import multiprocessing
from os import getpid
import time

def worker(i, s):
    print(i)
    #     print('I am number %d, %s in process %d' % (i, str, getpid()))
    time.sleep(.1)
    return i, s, getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    L1 = []
    for i in range(100):
        L2 = (i, "n"+str(i))
        L1.append(L2)
    # map for 1 argument, starmap for multiple arguments at worker
    results_unsorted = pool.starmap(worker, L1)
    results = sorted(results_unsorted)
    for res in results:
        print('task %d, name %s was done in process %d' % (res))

V1

From [4]: The threading module uses threads, the multiprocessing module uses processes. The difference is that threads run in the same memory space, while processes have separate memory. This makes it a bit harder to share objects between processes with multiprocessing. Since threads use the same memory, precautions have to be taken or two threads will write to the same memory at the same time. This is what the global interpreter lock is for. Spawning processes is a bit slower than spawning threads. Once they are running, there is not much difference.

Example from [5]

Prob: KeyboardInterrupt (CTRL+C) does not close program completely see [6] as starting point for a solution

from multiprocessing import Pool

def escapes(cr, ci, it):
    """Does iterating z <- z^2 + c escape after it iterations?"""
    zr = 0.0
    zi = 0.0
    for i in xrange(it):
        # z <- z^2 + c
        zr,zi = zr*zr - zi*zi + cr, 2*zr*zi + ci
        if zr*zr + zi*zi > 4:
            return True
    return False

def toChar(p):
    if p: return " "
    else: return "X"

def doRow((xmin,xmax,xstep, ymin,ymax,ystep, iterations, yc)):
    """Calculate one row of the output."""
    y = yc*(ymax-ymin)/ystep + ymin
    row = []
    for xc in xrange(xstep):
        x = xc*(xmax-xmin)/xstep + xmin
        row.append( escapes(x, y, iterations) )
    return "".join([toChar(p) for p in row])

def mandel(xmin,xmax,xstep, ymin,ymax,ystep, iterations):
    """Calculate and print a Mandelbrot set."""
    pool = Pool()  # askes the os for num of cpus ;-)
    results = []
    for yc in xrange(ystep):
        res = pool.apply_async(doRow, ((xmin,xmax,xstep, ymin,ymax,ystep, iterations, yc),))
        results.append(res)
    for yc in xrange(ystep):
        print results[yc].get()

if __name__=="__main__":
    mandel(-2.0, 1.0, 80, -1.0, 1.0, 24, 20000)