Python

From Torben's Wiki
Jump to navigationJump to search


Template

Standard Template

see header documentation example 1 and example 2 see Google Python Styleguide

#!/usr/bin/env python3

"""
Here comes the docstring containing the description of this piece of software
"""

# Built-in/Generic Imports
import os
import os.path  # os.path - The key to File I/O

# Libs
# import openpyxl

# Author and version info
__author__ = "Dr. Torben Menke"
__email__ = "https://entorb.net"
__maintainer__ = __author__
# __copyright__ = "Copyright 2020, My Project"
# __credits__ = ["John", "Jim", "Jack"]
__license__ = "GPL"
__status__ = "Dev"
__version__ = "0.1"

Object Oriented Template

#!/usr/bin/env python3

class myDevice:
    def __init__(self, devicename: str = "", verbose: bool = False):
        # name of the device (e.g. for log messages)
        self.devicename = devicename
        self.verbose = verbose  # whether to log information or be quiet

    def log(self, msg: str):
        print(msg)
 
class SMU236(myDevice):
    def __init__(self, gpibaddress: float, devicename: str = "", verbose: bool = False):
        myDevice.__init__(self, devicename, verbose)
        self.gpibaddress = gpibaddress
 
if __name__ == "__main__":
    SMU = SMU236(1234)
    SMU.log("Starting")

Basics

Naming Conventions

Google Python Style Guide:

module_name, package_name, ClassName, method_name, ExceptionName, function_name, GLOBAL_CONSTANT_NAME, global_var_name, instance_var_name, function_parameter_name, local_var_name

linting / code formatting

use software for handling the code formatting like "black"

pip install black

than activate in editor like vs code

Installing packages

python -m pip install --upgrade pip

pip install somemodule
# or 
pip3 install somemodule

# using a web proxy
# set proxy for windows cmd session
SET HTTPS_PROXY=http://myProxy:8080
(afterwards --proxy setting below no longer required
or
pip install --proxy http://myProxy:8080 somemodule

# list outdated packages
pip list --outdated

# update package
pip install --upgrade pyinstaller

# downgrade
pip install --upgrade pandas==1.2.4

Variables

del var     # delete / undef a variable
var = None  # sets to null

# check if variable is defined
if "var" in locals():
    pass
# for object oriented projects:
if "var" in self.__dict__.keys():
    pass

Access global variables in functions

var = 123
def test():
    global var # point to global instead of creation of local var
    var = 321

Strings

# num <-> str
s = str (i) # int to string
f = float(s) # str -> float
i = int(s)
str(round(f, 1)) # round first 
# tests
s.isdigit() # 0-9
# note isdecimal() does also not match '1.1'

Modify Strings

# get string from prompt
s = input("Enter Text: ")  

s = s.strip()  # trim spaces from both sides, rstrip for right only
s = s.lower()  # lower case
s = s.upper()  # upper case
s = s.title()  # upper case for first char of word

# upper case first letter of each word and also removes multiple and trailing spaces
import string
s = string.capwords(s)

# replace
s.replace(x, y)

# trim whitespaces from left and right
s.strip()

# replace all (multiple) whitespaces by single space ' '
s = " ".join(s.split())

# generate key value pairs from dict
# key1=value1&key2=value2
param_str = "&".join("=".join(tup) for tup in dict.items())

# repeat string multiple times
s * 5  # = s+s+s+s+s

substrings

# find a substring:
x in s
> True / False

if len(s) > 0

# handling substrings
a = "abcd"
b = a[:1] + "o" + a[2:] 
> 'aocd'

myString="Hello there !bob@"
i1 = myString.find("!")+1
i2 = myString.find("@")
mySubString=myString[i1:i2]

def substr_between(s: str, s1: str, s2: str) -> str:
    assert s1 in s, f'E: can\'t find \'{s1}\' in \'{s}\
    assert s2 in s, f'E: can\'t find \'{s1}\' in \'{s}\
    i1 = s.find(s1)+len(s1)
    i2 = s.find(s2)
    assert i1 < i2, f'E: \'{s1}\' not before \'{s2}\' in \'{s}\
    return s[i1:i2]

Binary, formatted, raw strings

# Binary Strings
key = b'asdf'
# or 
key = str.encode('asdf')
s = key.decode('ascii')  # decode binary strings
key = s.encode('ascii')  # encode string to binary
# Formatted string
s = f''
# raw string
s = r'c:\Windows\'  # no excape of \  needed
# convert utf-8 to html umlaute
lk_name = "Nürnberg".encode('ascii', 'xmlcharrefreplace').decode()
# -> Nürnberg

merge variables in string / sprintf

print ("Renner =", i)
print ("Renner = %3d" % i) # leading 0's
print (f"Renner = {i}")

# place formatted numbers in a string / sprintf
"The %03i %s cost %f euros" % (3, "beers", 11.50)
> 'The 3 beers cost 11.500000 euros'

"The length is %.2f meters" % 72.8958
>'The length is 72.90 meters'

p= "%.1f%%/min" % precent

Lists

like arrays in Perl

L = [1,2,3,4,5,6]
L = [x for x in range(10)]
L = "Word1 Word2 Word3".split() # split by spaces, like QW in Perl, use split(",") to split on ","
len(L)
L[0:10] # get elements 0-10
for a in L:
  ...

ATTENTION: = generates not a clone but a link

M = L       # M's elements are links to L's
# clones can be achieved via:
M = L.copy  # clones L
M = L[:]    # clones L
M = list(L) # clones L
L.append(x) # append a single element
L.extend(M) # put elements of list M to the end of List L
L.insert(i, x) # insert item x at position int i
L.pop() # returns and removes the last item
L.pop(i) # returns and removes the item at position int i
L.reverse()
L = sorted (L, key=str.casefold) # case insentitive / ignore case
L.remove(x) # removes the first occurrence of item x
L.count(x) # how many items x are in the list
L.index(x) # gives the position of the first x in list
s="".join(L)
x in L 
x not in L
# search for first match in list:
i = l_cont.index(1234)
line_footer = l_cont.index("")
# list to string
s = "\n".join(L)

# string to list
L = s.split("\n")

Initiate an "empty" list of certain length: consisting of certain number of None elements:

l = [None] * 10

merge 2 lists to list of tuples

data = list(zip(data_x, data_y))

Cartesian product of lists / tuples

import itertools
for i in itertools.product(*listOfLists):
    print(i)

# remove duplicate values from list
myList = list(dict.fromkeys(myList))
# via set
myUniqueValues = set(myDict.values())


list -> unique list

mySet = set(myList)

Multi-Level-Loop

  1. hard coded 3 levels:
for i3 in range(20):
    for i2 in range(20):
        for i1 in range(20):
            l = i1, i2, i3
            print(l)
  1. general approach:
import itertools as it
for tup in it.product(range(20), repeat=3):
   print(tup)

Looping over lists

modify each item in list by adding constant string

l = [s + ';' + v for v in l]

modify item in list

for idx, line in enumerate(cont):
  if "K1001/1" in line:
    line = "K1001/1 Test Nr " + str(i) + "\n"
    cont[idx] = line
    break

remove empty values from end

while L[-1] == "":
  # L = L[0:-1]
  L.pop()

modify or even remove certain items

# from https://stackoverflow.com/a/6024599
# iterates in-situ via index and reversing
for i in range(len(somelist) - 1, -1, -1):
    element = somelist[i]
    do_action(element)
    if check(element):
        del somelist[i]

Multi Dim Lists

lAllPoints = []
lAllPoints.append = ["a", "b", "c"]
# or using tuple
lAllPoints.append = ("a", "b", "c")

sort multidim list

lAllPoints = sorted(lAllPoints, key=lambda x: x[0], reverse=False)
data_all = sorted(data_all, key=lambda row: (row["Wann"], row["Wer"]), reverse=False)

Tuples

Ordered sequence, with no ability to replace or delete items

L = (1,2,3,4,5,6)

list -> tuple

l = tuple(l)

combine 2 tuples

l = la + lb

Dictionaries

like hash in Perl

d = {"keyx": x, "keyy": y}
d["keyz"] = z
d.keys()
["keyx", "keyy", "keyz"]
del d["keyy"]

len(d)
d.clear()
d.copy()
d.keys()
d.values()
d.items()  # returns a list of tuples (key, value)
d.get(k)  # returns value of key k
d.get(k, x)  # returns value of key k; if k is not in d it returns x
d.pop(k)  # returns and removes item k
d.pop(k, x)  # returns and removes item k; if k is not in d it returns x
x in d
x not in d

# loop over all keys and retrieve there values as well
for key, value in d.items():
    print(f"{key} = {value}")

# sort keys:
for userid in sorted(dict.keys()):
    pass
# sort values reversed
for id, value in sorted(d.items(), key=lambda item: item[1], reverse=True):
    pass

join / merge 2 dicts
d.update(d2)

MultiDim Dictionaries

dicProductivity = {}
dicProductivity["Cursor"] = {}
dicProductivity["Cursor"]["Nr"] = 1
dicProductivity["Cursor"]["Prod"] = 1.909e18
dicProductivity["Cursor"]["Cost"] = 0
dicProductivity["Cursor"]["Img"] = "templates/Shop01Cursor.png"
dicProductivity["Grandma"] = {}
dicProductivity["Grandma"]["Nr"] = 2
dicProductivity["Grandma"]["Prod"] = 1.725e18
dicProductivity["Grandma"]["Cost"] = 0
dicProductivity["Grandma"]["Img"] = "templates/Shop02Grandma.png"

for k in dicProductivity.keys():
    print(k)
    if "Img" in dicProductivity[k]:
        print("ja")

Alternatively one can use a tuple as key for dictionary:

d = fetchDataAsDict()
myTuple = (d["description"], d["meaning"], d["source"], d["fileName"])
dict_with_tuple_as_key[myTuple] = value

Loops

for / while controls

break    = exit loop
continue = cancel current iteration and go to start of next iteration

ATTENTION: The loops do not create a new variable scope. Only functions and modules introduce a new scope!

for i in range (10):
    print(i)
del (i)
while i <= 100:
   i+=1
   ...
   if sth:
       break

for i in range(1, 5):
  print i
  if sth:
    continue

for f in list :

inline if (requires a dummy else):

print("something") if self.verbose else 0

methods/functions

example:
def get_labeled_exif(exif: dict) -> dict:
    """converts the exif key IDs into strings and returns that readable dict"""
    labeled = {}
    for (key, val) in exif.items():
        labeled[TAGS.get(key)] = val
    return labeled

asserts function argument validation

aus Python Kurs von Carsten Knoll

def eine_funktion(satz, ganzzahl, zahl2, liste):
  if not type(satz) == str:
    print "Datentpyfehler: satz"
    return -1
  if not isinstance(ganzzahl, int):
    print "Datentpyfehler: ganzzahl"
    return -2
  if not isinstance(liste, (tuple, list)):
    print "Datentpyfehler: liste"
    return -3
  # Kompakteste Variante (empfohlen): 
  assert zahl2 > 0, "Error: zahl2 ist nicht > 0" # Assertation-Error bei Nichterfuellung
def F(x):
  if not isinstance(x, (float, int)):
    msg = "Zahl erwartet, %s bekommen" % type(x)
    raise ValueError(msg)
  return x**2

better:

def F(x):
  assert isinstance(x, (float, int)), "Error: x is not of type float or int"
  return x**2
assert variant in [
    "normal",
    "gray",
    "cannyedge",
], "Error: variant is not in 'normal', 'gray', 'cannyedge'"

Imports

import sys
import datetime
import time
import math
import random
import os.path
# Import my files
import MyFile # without tailing .py
# import a file, not stored in the same folder
import sys
sys.path.append("../libs/MyFile ")

Math

see Python - Math for linear regression

Python 2: get rid of the annoying integer division: [1]

from __future__ import division

Modulo

15 % 4
--> 3

Random

import random
random.randint(1000000, 9999999)

Date and Time

import datetime as dt
# from datetime import date, datetime, timedelta
date_a = dt.date.fromisoformat("2020-03-10")
date_today = dt.date.today()
date_yesterday = dt.date.today() - dt.timedelta(days=1)
dt_today = dt.datetime.today()

[2]
parsing iso dates

dt = dt.datetime.fromtimestamp(myTimestamp)
dt = dt.datetime.fromisoformat("2017-01-01T12:30:59.000000")
dt = dt.datetime.fromisoformat(str[:-1]) # to remove "Z" from end
dt = dt.datetime.fromisoformat("2020-03-10 06:01:01+00:00")
datestr = dt.date.today().strftime("%y%m%d")
datestr = dt.datetime.today().strftime("%y%m%d-%H%M")
# now in UCT
date = dt.datetime.now(dt.timezone.utc)
# now in UTC without milliseconds
date = dt.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"


rounding datetime

from datetime import datetime, timedelta
def floor_dt_minutes(dt: datetime, res: int = 5) -> datetime:
    """ floor (=round down) minutes to X min resolution """
    minNew = res * (dt.minute // res)
    return dt.replace(minute=minNew, second=0, microsecond=0)

def ceil_dt_minutes(dt: datetime, res: int = 5) -> datetime:
    """ ceil (=round up) minutes to X min resolution """
    minNew = res * (1 + dt.minute // res)
    return dt.replace(minute=0, second=0, microsecond=0) + \
        timedelta(minutes=minNew)

def round_dt_minutes(dt: datetime, res: int = 5) -> datetime:
    """ round minutes to X min resolution """
    minOldDec = float(dt.minute) + float(dt.second)/60
    minNew = res * round(minOldDec / res)
    return dt.replace(minute=0, second=0, microsecond=0) + \
        timedelta(minutes=minNew)

dt = datetime.fromisoformat('2020-03-10 06:01:01+00:00')
print(f"original: {dt}")
print(f"floored: {floor_dt_minutes(dt,5)}")
print(f"ceileded: {ceil_dt_minutes(dt,5)}")
print(f"rounded: {round_dt_minutes(dt,5)}")

timing

measure time elapsed

import time
timestart = time.time()
...
print(time.time() - timestart)

calculate time

import time
duration = 1234 # sec
print "ETA =",time.ctime(time.time()+duration)
array = time.localtime(time.time()+duration)

Exceptions

Catch keyboard interrupt and do a "save exit"

try:
  FILE = open("out.txt","w")
  while 1:
    i+=1
    print i
except KeyboardInterrupt:
  FILE.close()

Catch all exceptions

try:
  [...]
except Exception, e:
  print "Exception raised: ", e

Custom Exceptions

try: 
  raise Exception("HiHo")

Math: curve fitting

from[3]

import numpy as np

# curve-fit() function imported from scipy
from scipy.optimize import curve_fit
from matplotlib import pyplot as plt

# Test function with coefficients as parameters
def fit_function(x, a, b):
    return a * np.exp(b * x)

p0 = [data_y[-1], 0.14]  # initial guess of parameters
param, param_cov = curve_fit(
    fit_function, data_x, data_y, p0, bounds=((-np.inf, -np.inf), (np.inf, np.inf))
)

print(f"Coefficients:\n{param}")
print(f"Covariance of coefficients:\n{param_cov}")

data_y_fit = []
for x in data_x:
    y = fit_function(x, param[0], param[1])
    data_y_fit.append(y)
plt.plot(data_x, data_y, "o", color="red", label="data")
plt.plot(data_x, data_y_fit, "--", color="blue", label="fit")
plt.legend()
plt.show()

Regular Expressions

See [4]

See [5] for an online tester

multiple flags are joined via pipe |

s = re.sub("asdf.*", r"qwertz", s, flags=re.DOTALL | re.IGNORECASE)

matching

import re

# V0: simple 1
myPattern = "(/\*\*\* 0097_210000_0192539580000_2898977_0050 \*\*\*/.*?)($|/\*\*\*)"
myRegExp = re.compile(myPattern, re.DOTALL)
myMatch = myRegExp.search(cont)
assert myMatch != None, f"golden file not found in file {filename}"
cont_golden = myMatch.group(1)

# V1: simple 2
assert (
    re.match("^[a-z]{2}$", d_settings["country"]) != None
), f'Error: county must be 2 digit lower case. We got: {d_settings["country"]}'


Match email
def checkValidEMail(email: str) -> bool:
    # from https://stackoverflow.com/posts/719543/timeline bottom edit
    if not re.fullmatch(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$", email):
        print("Error: invalid email")
        quit()
    return True

Find all

myMatches = re.findall('href="([^"]+)"', cont)
for myMatch in myMatches:
    print(myMatch)

substring

import re

# simple via search
lk_id = re.search('^.*timeseries\-(\d+)\.json$', f).group(1)

# simple via sub
myPattern = "^.*" + s1 + "(.*)" + s2 + ".*$"
out = re.sub(myPattern, r"\1", s)

# more robust including an assert
def substr_between(s: str, s1: str, s2: str) -> str:
    """
    returns substring of s, found between strings s1 and s2
    s1 and s2 can be regular expressions
    """
    myPattern = s1 + '(.*)' + s2
    myRegExp = re.compile(myPattern)
    myMatches = myRegExp.search(s)
    assert myMatches != None, f"E: can't find '{s1}'...'{s2}' in '{s}'"
    out = myMatches.group(1)
    return out
matchObj = re.search(r"(\d+\.\d+)", text
if matchObj:
  price = float( '%s' % (matchObj).group(0) )

Naming of match groups

(?P<name>...), see [6]

Search and Replace

From [7]
re.sub(regex, replacement, str) performs a search-and-replace across subject, replacing all matches of regex in str with replacement. The result is returned by the sub() function. The str string you pass is not modified.

s = re.sub("  +", " ", s)

Splitting

From [8]
split() splits a string into a list delimited by the passed pattern. The method is invaluable for converting textual data into data structures that can be easily read and modified by Python as demonstrated in the following example that creates a phonebook.

First, here is the input. Normally it may come from a file, here we are using triple-quoted string syntax:

>>> input = """Ross McFluff: 834.345.1254 155 Elm Street
...
... Ronald Heathmore: 892.345.3428 436 Finley Avenue
... Frank Burger: 925.541.7625 662 South Dogwood Way
...
...
... Heather Albrecht: 548.326.4584 919 Park Place"""

The entries are separated by one or more newlines. Now we convert the string into a list with each nonempty line having its own entry:

>>> entries = re.split("\n+", input)
>>> entries
['Ross McFluff: 834.345.1254 155 Elm Street',
'Ronald Heathmore: 892.345.3428 436 Finley Avenue',
'Frank Burger: 925.541.7625 662 South Dogwood Way',
'Heather Albrecht: 548.326.4584 919 Park Place']

Finally, split each entry into a list with first name, last name, telephone number, and address. We use the maxsplit parameter of split() because the address has spaces, our splitting pattern, in it:

>>> [re.split(":? ", entry, 3) for entry in entries]
[['Ross', 'McFluff', '834.345.1254', '155 Elm Street'],
['Ronald', 'Heathmore', '892.345.3428', '436 Finley Avenue'],
['Frank', 'Burger', '925.541.7625', '662 South Dogwood Way'],
['Heather', 'Albrecht', '548.326.4584', '919 Park Place']]

perl grep and map

from [9]

def grep(list, pattern):
    expr = re.compile(pattern)
    return [elem for elem in list if expr.match(elem)]


def map(list, was, womit):
    return list(map(lambda i: re.sub(was, womit, i), list))
    # was = '.*"(\d+)".*'
    # womit = r"\1"

unit testing using pytest

install via

pip install pytest

activate in vscode, see [10]: To enable testing, use the Python: Configure Tests command on the Command Palette.

see https://docs.pytest.org/en/6.2.x/assert.html#assert

test_1.py:

import myLib # my custom lib to test

class TestClass:
    def test_one(self):
        x = "this"
        assert "h" in x

    def test_two(self):
        assert myLib.multiply(1, 2) == 2

    def test_three(self):
        assert myLib.multiply(2, 2) == 2

Sleep / Wait for input

sleep for a while

import time
time.sleep(60)

wait for user input

input("press Enter to close")

File Access

Basic

Split path into folder, filename, ext

import os
(dirName, fileName) = os.path.split(f)
(fileBaseName, fileExtension)=os.path.splitext(fileName)
fileOut = os.path.splitext(fileIn)[0] + "-out.txt"

Checking Operating System

import os
import sys

if os.name == "posix":
    print("posix/Unix/Linux")
elif os.name == "nt":
    print("windows")
else:
    print("unknown os")
    sys.exit(1)  # throws exception, use quit() to close / die silently

accessing os envrionment variables

Get filename of python script

from sys import argv
myFilename = argv[0]
import os
print(os.getenv("tmp"))

Command Line Arguments

Read filename from commandline parameter

import sys
for filename in sys.argv:

ArgumentParser

import argparse
parser = argparse.ArgumentParser()  # construct the argument parser and parse the arguments
# -h comes automatically

# Boolean Parameter
parser.add_argument("-v", "--verbose", help="increase output verbosity", action="store_true")  # store_true -> Boolean Value

# Choice Parameter
# restrict to a list of possible values / choices
# parser.add_argument("--choice", type=int, choices=[0, 1, 2], help="Test choices")

# Positional Parameter (like text.py 123)
# parser.add_argument("num", type=int, help="Number of things")

# Required Parameter
# parser.add_argument("-i", "--input", type=str, required=True, help="Path of input file")

# Optional Parameter
parser.add_argument("-n", "--number", type=int, help="Number of clicks")
# Optional Parameter with Default
parser.add_argument("-s", "--seconds", type=int, default=secDefault, help="Duration of clicking, default = %i (sec)" % secDefault)

args = vars(parser.parse_args())

if args["verbose"]:
    pass # do nothing
   # print ("verbosity turned on") 
if args["number"]:
    print("num=%i" % args["number"])

File Modifications

Copy File

shutil.copyfile(fileTemp,
                os.path.join(dest_path, fileOut))

Move/Rename file

os.rename(fileIn, fileDone)

Delete file

os.remove("file.txt")

Touch File

if os.path.exists(fname):
  os.utime(fname, None)
else:
  open(fname, "w").close()

File Meta Data

Get file size

import os
int (os.path.getsize("moinsen.txt") )

Read Timestamp (last modified)

lasttime = os.path.getmtime(fname)

Directories / Folders

Cross platform paths

currentdir = os.curdir
mysubdir = os.path.join(currentdir, "mysubdir")

Create Dir

os.makedirs(dir, exist_ok=True) # recursively: with all parents
# or
if not os.path.isdir(dir) :
  os.mkdir(dir)

Delete folder+contents

import shutil
shutil.rmtree(d)

Fetch Dir Contents / Loop over Files

Simple

Get list of files in directory, filter dirs from list, filter by ext

dir= "/path/to/some/dir"
listoffiles = [ f for f in os.listdir(dir) if os.path.isfile(os.path.join(dir ,f)) and f.lower()[-4:] == ".gpx"]
listoffiles.sort()

alternative loop via glob

import glob, os
os.chdir("/mydir")
for f in glob.glob("*.txt"):
    print(f)

or simply os.listdir:

import os
for f in os.listdir("/mydir"):
    if f.endswith(".txt"):
        print(os.path.join("/mydir", f))

new in 3.5: scandir

for f in os.scandir('./'):
    if f.is_file():
        (filename, fileext) = os.path.splitext(f.name)
Traverse in Subdirs
# walk into path an fetch all files matching extension jpe?g
files = []
for (dirpath, dirnames, filenames) in os.walk("."):
    dirpath = dirpath.replace("\\", "/")
    for file in filenames:
        if file.endswith(".txt"):
            files.append(dirpath + "/" + file)
        elif re.search(r"\.jpe?g$", file, re.IGNORECASE):
            files.append(dirpath + "/" + file)

File Parsing

File General

File Read

Check if file / dir exists

import os.path  # os.path - The key to File I/O
os.path.exists("text.txt")
os.path.isfile("text.txt")
os.path.isdir("text")
os.path.isabs("/home/torben/text.txt") # Is it an absolute path

with open(fileCache, mode="r", encoding="utf-8") as fh:
    cont = fh.read()
    # or
    list = fh.readlines()
    # or
    line = fh.readline()
    # or
    for line in fh:
        print(line)

fh = open(filename, mode="r", encoding="utf-8")
...
fh.close()

File Write

fileOut = "out/1/out.txt"
(filepath, fileName) = os.path.split(fileOut)
# (fileBaseName, fileExtension) = os.path.splitext(fileName)
os.makedirs(filepath, exist_ok=True)  # = mkdir -p

with open(fileOut, mode="w", encoding="utf-8", newline="\n") as fh:
    # w = overWrite file ; a = append to file
    # If running Python in Windows, "\n" is automatically replaced by "\r\n". To prevent this use newline='\n'
    fh.writelines(list)  # no linebreaks
    # or
    fh.write("\n".join(list))
    # or
    for line in list:
        fh.write(line)

    # Force update of filecontents without closing it
    fh.flush()

# alternative
fh = open(fileOut, mode="w", encoding="utf-8", newline="\n")
...
fh.close()

INI Config File Reading

Config.ini

[Section1]
Cursor         = 205E18
Grandma        =  18E18
Farm           =  11E18
Mine           = 514E18
Factory        = 155E18

test.py

from configparser import ConfigParser

config = ConfigParser(
    interpolation=None
)  # interpolation=None -> treats % in values as char % instead of interpreting it
config.read("Config.ini", encoding="utf-8")

print(config.getint("Section1", "key1"))
print(config.getfloat("Section1", "key2"))
print(config.get("Section1", "key3"))

for sec in config.sections():
    d_settings = {}
    for key in config.options(sec):
        value = config.get(sec, key)
        d_settings[key] = value
        print("%15s : %s" % (key, value))

CSV

CSV Read

import csv

# note: utf-8-sig for UTF-8-BOM
with open("data/ref_selected_countries.csv", mode="r", encoding="utf-8") as fh:
    csv_reader = csv.DictReader(fh, dialect="excel", delimiter="\t")
    for row in csv_reader:
        print(f'\t{row["name"]} works in the {row["department"]} department')

CSV Write

plain writing

with open('data.tsv', mode='w', encoding='utf-8', newline='\n') as fh:
    csvwriter = csv.writer(fh, delimiter="\t")
    csvwriter.writerow(  
        ('Date', 'Confirmed')
    )
with open(filename + ".tsv", mode="w", encoding="utf-8", newline="\n") as fh:
    csvwriter = csv.DictWriter(
        fh,
        delimiter="\t",
        extrasaction="ignore",
        fieldnames=["date", "occupied_percent", "occupied", "total"],
    )
    csvwriter.writeheader()
    for d in myList:
        d["occupied_percent"] = round(100 * d["occupied"] / d["total"], 1)
        csvwriter.writerow(d)

JSON

JSON Read

with open(download_file, mode="r", encoding="utf-8") as fh:
    d_json = json.load(fh)

JSON Write

Write dict to file in JSON format, keeping utf-8 encoding

with open("my_file.json", mode="w", encoding="utf-8", newline="\n") as fh:
    json.dump(my_dict, fh, ensure_ascii=False, sort_keys=False, indent=2)

Excel

Excel Read

import openpyxl

workbook = openpyxl.load_workbook(
    pathToMyExcelFile, data_only=True
)  # data_only : read values instead of formulas
sheet = workbook["mySheetName"]
# or fetch active sheet
sheet = workbook.active
cell = sheet["A34"]
# or
cell = sheet.cell(row=34, column=1)  # index start here with 1
print(cell.value)
# or
print(sheet.cell(column=col, row=row).value)

Excel Write

import openpyxl

workbookOut = openpyxl.Workbook()
sheetOut = workbookOut.active
cellIn = sheetOut["A34"]
# or
cellOut = sheetOut.cell(row=i, column=j)  # index start here with 1
cellOut.value = "asdf"a
workbookOut.save("out.xlsx")

Image/Picture/Photo Resize and Exif Modifying

from PIL import Image, ImageFilter  # pip install Pillow

fileIn = "2018-02-09 13.56.25.jpg"
# Read image
img = Image.open(fileIn)

PROBLEM:
PIL Image.save() drops the IPTC data like tags, keywords, copywrite, ...
better using https://imagemagick.org instead when tags shall be kept

Resize

# Resize keeping aspect ration -> img.thumbnail
# drops exif data, exif can be added from source file via exif= in save, see below
size = 1920, 1920
img.thumbnail(size, Image.ANTIALIAS)

Export file

fileOut = os.path.splitext(fileIn)[0] + "-edit.jpg"
try:
    img = Image.open(fileIn)
    img.save(fp=fileOut, format="JPEG", quality='keep')  # exif=dict_exif_bytes
    # JPEG Parameters
    # * qualitiy : 'keep' or 1 (worst) to 95 (best), default = 75. Values above 95 should be avoided.
    # * dpi : tuple of integers representing the pixel density, (x,y)
except IOError:
    print("cannot write file '%s'" % fileOut)

Export Progressive / web optimized JPEG

from PIL import ImageFile  # for MAXBLOCK for progressive export
fileOut = os.path.splitext(fileIn)[0] + "-progressive.jpg"
try:
    img.save(fp=fileOut, format="JPEG", quality=80, optimize=True, progressive=True)
except IOError:
    ImageFile.MAXBLOCK = img.size[0] * img.size[1]
    img.save(fp=fileOut, format="JPEG", quality=80, optimize=True, progressive=True)

JPEG Meta Data: EXIF and IPTC

IPTC: Tags/Keywords
from iptcinfo3 import IPTCInfo  # this works in pyhton 3!
iptc = IPTCInfo(fileIn)
if len(iptc['keywords']) > 0:  # or supplementalCategories or contacts
    print('====> Keywords')
    for key in sorted(iptc['keywords']):
        s = key.decode('ascii')  # decode binary strings
        print(s)
EXIF via piexif
import piexif  # pip install piexif
exif_dict = piexif.load(img.info['exif'])
print(exif_dict['GPS'][piexif.GPSIFD.GPSAltitude])
# returns list of 2 integers: value and donator  -> v / d
# (340000, 1000) => 340m
# (51, 2) => 25.5m

# Modify altitude
exif_dict['GPS'][piexif.GPSIFD.GPSAltitude] = (140, 1)  # 140m

# write to file
exif_bytes = piexif.dump(exif_dict)
fileOut = os.path.splitext(fileIn)[0] + "-modExif.jpg"
try:
    img.save(fp=fileOut, format="jpeg", exif=exif_bytes, quality='keep')
except IOError:
    print("cannot write file '%s'" % fileOut)

or

exif_dict = piexif.load(fileIn)
for ifd in ("0th", "Exif", "GPS", "1st"):
    print("===" + ifd)
    for tag in exif_dict[ifd]:
        print(piexif.TAGS[ifd][tag]["name"], "\t",
              tag, "\t", exif_dict[ifd][tag])
print(exif_dict['0th'][306]) # 306 = DateTime
EXIF via exifread
# Open image file for reading (binary mode)
fh = open(fileIn, "rb")
# Return Exif tags
exif = exifread.process_file(fh)
fh.close()
# for tag in exif.keys():
#     if tag not in ('JPEGThumbnail', 'TIFFThumbnail', 'Filename', 'EXIF MakerNote'):
#         print("%s\t%s" % (tag, exif[tag]))
print(exif["Image DateTime"])
print(exif["GPS GPSLatitude"])
print(exif["GPS GPSLongitude"])
EXIF GPS via PIL
# from https://developer.here.com/blog/getting-started-with-geocoding-exif-image-metadata-in-python3
def get_exif(filename):
    image = Image.open(filename)
    image.verify()
    image.close()
    return image._getexif()


def get_labeled_exif(exif):
    labeled = {}
    for (key, val) in exif.items():
        labeled[TAGS.get(key)] = val
    return labeled


def get_geotagging(exif):
    if not exif:
        raise ValueError("No EXIF metadata found")
    geotagging = {}
    for (idx, tag) in TAGS.items():
        if tag == "GPSInfo":
            if idx not in exif:
                raise ValueError("No EXIF geotagging found")
            for (key, val) in GPSTAGS.items():
                if key in exif[idx]:
                    geotagging[val] = exif[idx][key]
    return geotagging


def get_decimal_from_dms(dms, ref):
    degrees = dms[0][0] / dms[0][1]
    minutes = dms[1][0] / dms[1][1] / 60.0
    seconds = dms[2][0] / dms[2][1] / 3600.0
    if ref in ["S", "W"]:
        degrees = -degrees
        minutes = -minutes
        seconds = -seconds
    return round(degrees + minutes + seconds, 5)


def get_coordinates(geotags):
    lat = get_decimal_from_dms(geotags["GPSLatitude"], geotags["GPSLatitudeRef"])
    lon = get_decimal_from_dms(geotags["GPSLongitude"], geotags["GPSLongitudeRef"])
    return (lat, lon)


exif = get_exif(fileIn)
exif_labeled = get_labeled_exif(exif)
print(exif_labeled["DateTime"])

geotags = get_geotagging(exif)
print(get_coordinates(geotags))

Template Matching

see Python - CV2

Optical Character Recognition (OCR)

see Python - OCR

GPX parsing

import gpxpy
import gpxpy.gpx
# Elevation data by NASA: see lib at https://github.com/tkrajina/srtm.py
fh_gpx_file = open(gpx_file_path, 'r')
gpx = gpxpy.parse(fh_gpx_file)
#  Loops for accessing the data
for track in gpx.tracks:
    for segment in track.segments:
        for point in segment.points:
for waypoint in gpx.waypoints:
for route in gpx.routes:
    for point in route.points: 
# interesting properties of point / waypoint objects:
point.time
point.latitude
point.longitude
point.source
waypoint.name

Templates/Snippets

Logging

V2: File and STDOUT

# 1. setup
import logging
from logging.handlers import RotatingFileHandler

logfile = "myApp.log"
maxBytes = 20 * 1024 * 1024
backupCount = 5
loglevel_console = logging.INFO
loglevel_file = logging.DEBUG

# create logger
logger = logging.getLogger("root")
logger.setLevel(loglevel_file)

# console handler
ch = logging.StreamHandler()
ch.setLevel(loglevel_console)

# rotating file handler
# fh = logging.FileHandler(logfile)
fh = RotatingFileHandler(logfile, maxBytes=maxBytes, backupCount=backupCount)
fh.setLevel(loglevel_file)

# create formatter and add it to the handlers
# %(name)s = LoggerName, %(threadName)s = TreadName
formatter = logging.Formatter(
    "%(asctime)s - %(levelname)s - %(name)s - %(threadName)s - %(message)s "
)
fh.setFormatter(formatter)
ch.setFormatter(formatter)

# add the handlers to the logger
logger.addHandler(ch)
logger.addHandler(fh)

logger.debug("DebugMe")
logger.info("Starting")
logger.warning("Attention")
logger.error("Something went wrong")
logger.critical("Something seriously went wrong ")

# 2. in other files/modules now use
import logging

logger = logging.getLogger(__name__)
logger.info("text")

...
except Exception as e:
    logger.exception("Unhandeled exception")
    quit()

V1: STDOUT

import logging

# Logging is nicer than print, as it can automatically add the threadname
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s %(levelname)s %(threadName)s: %(message)s",
)
logging.info("Starting")

Hexadecimal

see Python - Hex


Compile to .exe

pip install pyinstaller
pyinstaller --onefile --console your.py

(Python - py2exe is deprecated)

Process Bar

see tqdm

from tqdm import tqdm
for i in tqdm(range(10000)):
    ....

CGI Web development

# Print necessary headers.
print("Content-Type: text/html")
print()

# errors and debugging info to browser
import cgitb
cgitb.enable()


Access URL or Form Parameters

# V2 from https://www.tutorialspoint.com/python/python_cgi_programming.htm
import cgi
form = cgi.FieldStorage()
username = form.getvalue('username')
print(username)
# V1
import sys
import urllib.parse
query = os.environ.get('QUERY_STRING')
query = urllib.parse.unquote(query, errors="surrogateescape")
d = dict(qc.split("=") for qc in query.split("&"))
print(d)

CGI Backend Returning JSONs

#!/usr/local/bin/python3.6
# -*- coding: utf-8 -*-

import cgi
import json

# Print necessary headers.
print("Content-type: application/json")
print()

def get_form_parameter(para: str) -> str:
    "asserts that a given parameter is set and returns its value"
    value = form.getvalue(para)
    assert value, f"Error: parameter {para} missing"
    assert value != "", f"Error: parameter {para} missing"
    return value
 
response = {}
response['status'] = "ok"

try:
    action = get_form_parameter("action")
    response['action'] = action
    if action == "myAction":
        ...

except Exception as e:
    response['status'] = "error"
    d = {"type": str(type(e)), "text": str(e)}
    response["exception"] = d

finally:
    print(json.dumps(response))

Databases

PostgreSQL

Basics

import psycopg2
import psycopg2.extras

credentials = {
    "host": "localhost",
    "port": 5432,
    "database": "myDB",
    "user": "myUser",
    "password": "myPwd",
}
connection = psycopg2.connect(**credentials)
cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor)

l_bind_vars = [["A1", "A2"], ["B1", "B2"]]

sql = """
SELECT * FROM myTable
WHERE 1=1 
AND status NOT IN ('CLOSED') 
AND ColA = %s 
AND ColB = %s 
ORDER BY created DESC
"""
cursor.execute(sql, l_bind_vars)
d_data = dict(cursor.fetchone())

export result to csv file

sql1 = "SELECT * FROM table"
sql2 = "COPY (" + sql1 + ") TO STDOUT WITH CSV HEADER DELIMITER '\t'"
        with open("out.csv", "w") as file:
            cursor.copy_expert(sql2, file)

ReadConfigFile to connect to PostgreSQL

from [11] database.ini

[postgresql]
host=dbhost
port=5432
database=dbname
user=dbuser
password=dbpass
from configparser import ConfigParser


def config(filename="database.ini", section="postgresql"):
    parser = ConfigParser()
    parser.read(filename)
    # get section, default to postgresql
    db = {}
    if parser.has_section(section):
        params = parser.items(section)
        for param in params:
            db[param[0]] = param[1]
    else:
        raise Exception(
            "Section {0} not found in the {1} file".format(section, filename)
        )
    return db

main.py

import psycopg2
from config import config


def connect():
    """Connect to the PostgreSQL database server"""
    conn = None
    try:
        params = config()  # read connection parameters
        print("Connecting to the PostgreSQL database...")
        conn = psycopg2.connect(**params)
        cur = conn.cursor()  # create a cursor
        print("PostgreSQL database version:")
        cur.execute("SELECT version()")  # execute a statement
        db_version = cur.fetchone()
        print(db_version)
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()
            print("Database connection closed.")


if __name__ == "__main__":
    connect()

SQL Lite / SQLite

see page SQLite

Internet Access

Send E-Mails

see Python - eMail


Download file

in Python 3 this is the preferred way of downloading files:

import urllib.request

url = "https://pomber.github.io/covid19/timeseries.json"
filedata = urllib.request.urlopen(url)
datatowrite = filedata.read()
with open("test.json", "wb") as fh:
    fh.write(datatowrite)

Download data using browser UA

import requests
headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:75.0) Gecko/20100101 Firefox/75.0 ",
    }
resp = requests.get(url, headers=headers)
if resp.status_code != 200:
    raise Exception(
        f"E: bad response. status code:{resp.status_code}, text:\n{resp.text}"
    )

Download only if cache is too old

def fetch_url_or_cache(fileCache, url) -> str:
    if check_cache_file_available_and_recent(
        file_cache=fileCache, max_age=3600, verbose=False
    ):
        with open(fileCache, mode="r", encoding="utf-8") as fh:
            s = fh.read()
    else:
        s = fetch(url=url)
        with open(fileCache, mode="w", encoding="utf-8", newline="\n") as fh:
            fh.writelines(s)
    return s

def check_cache_file_available_and_recent(
    file_cache: str, max_age: int = 3600, verbose: bool = False
) -> bool:
    b_cache_good = True
    if not os.path.exists(file_cache):
        if verbose:
            print(f"No Cache available: {file_cache}")
        b_cache_good = False
    if b_cache_good and time.time() - os.path.getmtime(file_cache) > max_age:
        if verbose:
            print(f"Cache too old: {file_cache}")
        b_cache_good = False
    return b_cache_good

def fetch(url) -> str:
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:75.0) Gecko/20100101 Firefox/75.0 ",
    }
    resp = requests.get(url, headers=headers)
    if resp.status_code == 200:
        return resp.content.decode("ascii")
    else:
        raise Exception(
            f"E: bad response. status code:{resp.status_code}, text:\n{resp.text}"
        )

Download HTML and extract elements

V2: via BeautifulSoup

from bs4 import BeautifulSoup  # pip install beautifulsoup4 
soup = BeautifulSoup(cont, features='html.parser')
myElement = soup.find("div", {"class": "user-formatted-inner"})
myBody = myElement.prettify()
# myBody = myElement.encode()
# myBody = str(myElement)

V1: via lxml and xpath

from lxml import html
import requests

page = requests.get(url)
tree = html.fromstring(page.content)
tbody_trs = tree.xpath("//*/tbody/tr")
l_rows = []
for tr in tbody_trs:
    l_columns = []
    if len(tr) != 15:
        continue
    for td in tr:
        l_columns.append(td.text_content())
        l_rows.append(list(l_columns))

Call Rest API

def perform_rest_call_str(url: str) -> str:
    resp = requests.get(url)
    if resp.status_code != 200:
        raise Exception(
            f"E: bad response. status code:{resp.status_code}, text:\n{resp.text}"
        )
    return resp.text

GUI Interactions

Take Screenshot

import pyautogui # (c:\Python\Scripts\)pip install pyautogui
# pyautogui does only support screenshots on monitor #1
...
screenshot = pyautogui.screenshot()
# screenshot = pyautogui.screenshot(region=(screenshotX,screenshotY, screenshotW, screenshotH))
screenshot = np.array(screenshot) 
# Convert RGB to BGR 
screenshot = screenshot[:, :, ::-1].copy()

Mouse Actions

def clickIt(x,y,key="") :
  x0, y0 = pyautogui.position()
  if key != "": # crtl, shift
    pyautogui.keyDown(key)
  pyautogui.moveTo(x, y, duration=0.2)
  pyautogui.click(x=x , y=y, button='left', clicks=1, interval=0.1)
  if key != "": # crtl, shift
    pyautogui.keyUp(key)
  pyautogui.moveTo(x0, y0)

Web Automation

from selenium import webdriver
from selenium.webdriver.common.keys import Keys

# from selenium.webdriver import Firefox
from selenium.webdriver.firefox.options import Options

import os
import time
import glob

class StravaUserMapDL():
    def __init__(self):
        self.driver = webdriver.Firefox()

    def login(self):
        driver = self.driver
        url = "https://www.somewebpage.com"
        email = "myemail"
        password = "mypassword"
        driver.get(url)

        title = driver.title
        urlIs = driver.current_url
        cont = driver.page_source #  as string
        FILE = open(filename,"w") # w = overWrite file ; a = append to file
        FILE.write(cont)
        FILE.close()         

        # handle login if urlIs != url
        if (urlIs != url): 
            # activate checkbox 'remember_me'
            elem = driver.find_element_by_id('remember_me')
            if (elem.is_selected() == False):
                elem.click()
            assert elem.is_selected() == True
            elem = driver.find_element_by_id('email')
            elem.send_keys(email)
            elem = driver.find_element_by_id('password')
            elem.send_keys(password)
            elem.send_keys(Keys.RETURN)
            # Wait until login pages is replaced by real page
            urlIs = driver.current_url
            while (urlIs != url):
                time.sleep(1)
                urlIs = driver.current_url
            print (urlIs)

            # results = driver.find_elements_by_class_name('following')
            # results = driver.find_elements_by_tag_name('li')

            # print(results[0].text)
        assert (urlIs == url)

Unit Tests using Web Automation

import unittest
from selenium import webdriver
from selenium.webdriver.common.keys import Keys

#from selenium.webdriver import Firefox
from selenium.webdriver.firefox.options import Options

import os
import time

class PythonOrgSearch(unittest.TestCase):
#    def __init__(self,asdf):
#        self.driver = webdriver.Firefox() 

    def setUp(self):
        print ("setUp")
        # headless mode:
        # opts = Options()
        # opts.set_headless()
        # assert opts.headless  # Operating in headless mode
        # self.driver = webdriver.Firefox(options=opts)

        self.driver = webdriver.Firefox()

    def test_search_in_python_org(self):
        driver = self.driver
        driver.get("http://www.python.org")
        self.assertIn("Python", driver.title)
        elem = driver.find_element_by_name("q")
        elem.send_keys("pycon")
        elem.send_keys(Keys.RETURN)
        assert "No results found." not in driver.page_source
        print ("fertig: python_org")

    def tearDown(self):
        print ("tearDown")
        print ("close Firefox")
        self.driver.close() # close tab
        self.driver.quit() # quit browser
        # os._exit(1) # exit unittest without Exception


if __name__ == "__main__":
    try:
        unittest.main()
    except SystemExit as e:
        os._exit(1)

Cryptography and Hashing

Hashing via SHA256

def gen_SHA256_string(s: str) -> str:
    m = hashlib.sha256()
    m.update(s.encode("ascii"))
    return m.hexdigest()

Hashing via MD5

(MD5 is not secure, better use SHA256)

def gen_MD5_string(s: str) -> str:
    m = hashlib.md5()
    m.update(s.encode("ascii"))
    return m.hexdigest()

Password hashing via bcrypt

import bcrypt
pwd = 'geheim'
pwd = pwd.encode("utf-8")
# or 
pwd = b'geheim'

hashed = bcrypt.hashpw(pwd, bcrypt.gensalt())
if bcrypt.checkpw(pwd, hashed):
    print("It Matches!")
    print(hashed.decode("utf-8"))

To use version 2a instead of 2b (default):

bcrypt.gensalt(prefix=b"2a")

Multiprocessing, subprocesses and Threading

see Python_-_Multithreading as well

use processes for CPU limited work
use threads for I/O limited work

Simple single process

import subprocess
process = subprocess.run(["sudo", "du", "--max-depth=1", mydir], capture_output=True, text=True)
print (process.stdout)

old, depricated way:

os.system( "gnuplot " + gpfile)


Multiprocessing

see Python - Multithreading as well

V2 using pool and starmap

import multiprocessing
import os

def worker(i: int, s: str) -> list:
    result = (i, s, os.getpid())
    return result

if __name__ == "__main__":
    # gen. pile of work
    l_pile_of_work = []
    for i in range(1_000):
        tup = (i, "n" + str(i))
        l_pile_of_work.append((tup))
    # gen pool of processes
    num_processes = min(multiprocessing.cpu_count(), len(l_pile_of_work))
    pool = multiprocessing.Pool(processes=num_processes)
    # start processes on pile of work
    l_results_unsorted = pool.starmap(
        func=worker, iterable=l_pile_of_work  # each item is a list of 2 parameters
    )
    
    # or if only one parameter:
    # l_results_unsorted = pool.map(doit_de_district, l_pile_of_work)
    
    l_results = sorted(l_results_unsorted)  # sort by i

V1

import subprocess
l_subprocesses = []  # queue list of subprocesses
max_processes = 4

def process_enqueue(new_process_parameters):
    global l_subprocesses
    # wait for free slot
    while len(l_subprocesses) >= max_processes:
        process_remove_finished_from_queue()
        time.sleep(0.1)  # sleep 0.1s
    process = subprocess.Popen(new_process_parameters,
                               stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                               universal_newlines=True)
    l_subprocesses.append(process)

def process_remove_finished_from_queue():
    global l_subprocesses
    i = 0
    while i <= len(l_subprocesses) - 1:
        process = l_subprocesses[i]
        if process.poll != None:  # has already finished
            process_print_output(process)
            l_subprocesses.pop(i)
        else:  # still running
            i += 1

def process_print_output(process):
    """waits for process to finish and prints process output"""
    stdout, stderr = process.communicate()
    if stdout != :
        print(f'Out: {stdout}')
    if stderr != :
        print(f'ERROR: {stderr}')

def process_wait_for_all_finished():
    global l_subprocesses
    for process in l_subprocesses:
        process_print_output(process)
    l_subprocesses = []  # empty list of done subprocesses

process_enqueue(l_parameters1)
...
process_enqueue(l_parameters999)
process_wait_for_all_finished()

Threading

import threading
import queue
import os
import time

def worker(q_work: queue.Queue, results: dict):
    while not q_work.empty():
        i, s = q_work.get()
        time.sleep(.1)
        result = (i, s, os.getpid())
        results[i] = result
        q_work.task_done()

if __name__ == '__main__':
    d_results = {}  # threads can write into dict
    # gen. pile of work
    l_pile_of_work = []
    for i in range(1_000):
        tup = (i, "n"+str(i))
        l_pile_of_work.append((tup))
    # convert list of work to queue
    q_pile_of_work = queue.Queue(
        maxsize=len(l_pile_of_work))  # maxsize=0 -> unlimited
    for params in l_pile_of_work:
        q_pile_of_work.put(params)
    # gen threads
    num_threads = 100
    l_threads = []  # List of threads, not used here
    for i in range(num_threads):
        t = threading.Thread(name='myThread-'+str(i),
                             target=worker,
                             args=(q_pile_of_work, d_results),
                             daemon=True)
        l_threads.append(t)
        t.start()
    q_pile_of_work.join()  # wait for all threas to complete
    l_results_unsorted = d_results.values()
    l_results = sorted(l_results_unsorted)  # sort by i


asyncio — Asynchronous I/O

see https://docs.python.org/3/library/asyncio-task.html

import asyncio
import time

# basics
# task = asyncio.create_task(coro())
# Wrap the coro coroutine into a Task and schedule its execution. Return the Task object.

# sleep
# await asyncio.sleep(1)

# Running Tasks Concurrently and gathers the return values in list L
# L = await asyncio.gather(coro(x1,y1), coro(x2,y2), coro(x3,y3))

async def say_after(delay, what):
    # Coroutines declared with the async/await syntax
    await asyncio.sleep(delay)
    print(what)

async def main():
    # The asyncio.create_task() function to run coroutines concurrently as asyncio Tasks.
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(1, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

Pandas

import pandas as pd

create

# empty
df = pd.DataFrame()

# from 1 dim list
df = pd.DataFrame(data={'Deaths_Covid_2020': l})
# from N dim list
df = pd.DataFrame(data=l_results, columns=('num_songs_played', 'pct_all_played', 'pct_80pct_played'))
# from multiple lists
data = zip(l_dates, l_deaths2016, l_deaths2017,
           l_deaths2018, l_deaths2019, l_deaths2020)
df = pd.DataFrame(data, columns=['Day', '2016', '2017',
                                 '2018', '2019', '2020'])
# from CSV
df = pd.read_csv('data/de-states/de-state-DE-total.tsv', sep="\t")

# from Excel
pip install openpyxl
df = pd.read_excel(
        open(excelFile, "rb"), sheet_name="COVID_Todesfälle_KW_AG10", engine="openpyxl"
    )

# from dict
d = {}
for col in df_rki.columns:
    d[col] = df_rki[col].sum()
df = pd.DataFrame.from_dict(d, orient="index", columns=["Covid_Tote"])

# deep copy 
df2 = df.copy()

joining

# join 2 df on common column IMS_ID
df = df1.set_index("IMS_ID").join(df2.set_index("IMS_ID"))

selecting

# select columns
df = df [ list_of_column_names ]
df = df [ [ "date", "gemeindeschluessel", "betten_frei", "betten_belegt" ] ]

sum

# calc sum of row
sum_cases = df["Cases"].sum()
# sum per index
df_sums = df.sum(axis="index")
# sum per column
df_sums = df.sum(axis="columns")

filter

# filter on column value
df = df[df["Date"] == "2021-11-13"]
# multiple
df = df[~df["Date"].isin(("2020-02-29", "2024-02-29", "2028-02-29"))]

# filter column value based on list
df = df[df["gemeindeschluessel"].isin(l_lkids)]
df = df[df["Date"].isin(("2020-02-29", "2024-02-29", "2028-02-29"))]
# is not in
df = df[~df["Date"].isin(("2020-02-29", "2024-02-29", "2028-02-29"))]

# remove data from latest week, as it might not be complete
df = df[df["YearWeek"] < df["YearWeek"].max()]

convert to date

# convert date_str to date
for c in ("DATE_DRAW", "RECEIVE_DATE", "PROCESSING_DATE"):
    df[c] = pd.to_datetime(df[c], format="%Y-%m-%d")

overwrite date

# overwrite column data of last 3 weeks by None
df["DateAsDate"] = pd.to_datetime(df["Date"], format="%Y-%m-%d")
date_3w = dt.date.today() - dt.timedelta(weeks=3)
df.loc[df["DateAsDate"].dt.date >= date_3w, "MyColumn"] = None

# rolling takes NAN values into account, so I need to overwrite them as well
df3["Deaths_Covid_roll"] = np.where(
    df3["Deaths_Covid"].isnull(), np.nan, df3["Deaths_Covid_roll"]

# replace na by 0
df = df.fillna(0)

# remove word "Probable" from scorpio_call for better clustering
df["scorpio_call"] = df["scorpio_call"].replace(
    to_replace=r"^Probable ", value="", regex=True
)

Sorting

# sort by column
df = df.sort_values(by=['betten_belegt'], ascending=False)

Group By

# group and count
df_lineages = (
    df_all_data.groupby(["lineage", "RECEIVE_DATE"]).size().reset_index(name="count")
)

# group and sum all other columns
df = df.groupby(["YearWeek"]).sum()
df = df.groupby(['Impfdatum'])['Anzahl'].sum().reset_index()

df = df.groupby(['date', 'bundesland']).agg(
  {'faelle_covid_aktuell_invasiv_beatmet': 'sum',
   'betten_ges': 'sum'}
)

df_top_ten = (
    df.groupby("myColumn")
    .sum() # sums over remaining column "count"
    .sort_values(by="count", ascending=False)
    .head(10)
)

Column Handling

df = df.rename({
     "faelle_covid_aktuell_invasiv_beatmet": "beatmet",
}, axis=1, errors="raise")

# convert int to str adding leading zeros
df["Sterbewoche"].astype(str).str.zfill(2)

drop columns

df = df.drop(
    ["Gesamt","Summe"],
    axis="columns",
)

sum up over all columns

sum_cases = 0
for col in df.columns:
    sum_cases += df[col].sum()

Column Header Handling

# rename column headers by extracting some int values from a string
l2 = []
for col in df.columns:
    year = int(col[0:4])
    week = int(col[5:7])
    l2.append(year * 100 + week)
df.columns = l2


Row Handling

add row from list and index

idx = df.index[-1] + 1
list_of_values = (...)
df.loc[idx] = list_of_values

Index Handling

# set index name
df.index.name = "YearWeek"

int indexes

select column as index
df.set_index("Altersgruppe", inplace=True)

# reset index to start at 0
df2 = df1[1 * 365 : 2 * 365].reset_index(drop=True)

# filter on index
df = df[df.index >= start_yearweek]

datetime indexes

# index to datetime
df.index = pd.to_datetime(df.index)

# filter on date
# drop data prior to 2020
df = df.loc['2020-01-01':]
# alternative:
df = df[df.index >= "2021-01-10"]

date_last = pd.to_datetime(df.index[-1]).date()

# reindex and fill missing with 0
date_last = pd.to_datetime(df.index[-1]).date()
idx = pd.date_range('2020-01-01', date_last))
df = df.reindex(idx, fill_value=0)

# add missing dates
df = df.asfreq('D', fill_value=0)

# drop values of column for last 3 weeks
date_3w = dt.date.today() - dt.timedelta(weeks=3)
df.loc[df.index.date >= date_3w, "Cases"] = None

text indexes

df.set_index("Altersgruppe", inplace=True)

extract cell

# select column for index
betten_ges = df2["betten_ges"].iloc[0]
# extract cell
de_sum = df["Personen"].loc["Summe"]
# drop row based on index
df.drop("Summe", inplace=True)

more

df['2016_roll'] = df['2016'].rolling(window=7, min_periods=1).mean().round(1)
df['2017_roll'] = df['2017'].rolling(window=7, min_periods=1).mean().round(1)
df['2018_roll'] = df['2018'].rolling(window=7, min_periods=1).mean().round(1)
df['2019_roll'] = df['2019'].rolling(window=7, min_periods=1).mean().round(1)
df['2020_roll'] = df['2020'].rolling(window=7, min_periods=1).mean().round(1)
# mean value of 4 columns
df['2016_2019_mean'] = df.iloc[:, [1, 2, 3, 4]
                               ].mean(axis=1)  # not column 0 = day
df['2016_2019_mean_roll'] = df['2016_2019_mean'].rolling(
    window=7, min_periods=1).mean().round(1)

df['2016_2019_roll_max'] = df.iloc[:, [6, 7, 8, 9]].max(axis=1)
df['2016_2019_roll_min'] = df.iloc[:, [6, 7, 8, 9]].min(axis=1)

# prepend empty values to df
# Jan und Feb values are missing for Covid Deaths series, so I need a couple of empty rows
l = [None] * 59
df1 = pd.DataFrame(data={'Deaths_Covid_2020': l})
# append df to df 
df_covid_2020 = pd.DataFrame()
df_covid_2020['Deaths_Covid_2020'] = df1['Deaths_Covid_2020'].append(
    df2['Deaths_Covid_2020'], ignore_index=True)

# ensure first row is from 28.2
assert (df2.iloc[0]['Date'] ==
        '2020-02-28'), "Error of start date, expecting 2020-02-28"

# copy
df2['Date'] = df0['Date']

# drop 2 rows from the beginning
df2.drop([0, 1], inplace=True)

# negative -> 0
df[df < 0] = 0

=== Helpers ===
def pandas_set_date_index(df, date_column: str):
    """ use date as index """
    df[date_column] = pd.to_datetime(df[date_column], format='%Y-%m-%d')
    df.set_index([date_column], inplace=True)
    return df

def pandas_calc_roll_av(df, column: str, days: int = 7):
    """ calc rolling average over column """
    df[column + '_roll_av'] = df[column].rolling(
        window=days, min_periods=1).mean().round(1)
    return df

Transpose

# transpose to have yearweek as index
df = df.transpose()

Plotting using pandas and matplotlib

import matplotlib.pyplot as plt

horizontal bar chart of rows "plant" and "cnt"

myPlot = df.plot.barh(legend=False, x='plant', y='cnt', linewidth=2.0, zorder=1, figsize=(12, 12))
plt.gca().invert_yaxis()
plt.gca().xaxis.set_major_formatter(mtick.PercentFormatter()) #
myPlot.set_ylim(0, 100)
plt.title('My Title')
plt.xlabel("")
# x y grid
plt.gca().set_axisbelow(True)  # for grid below the lines
plt.grid(axis='both')
# x grid for bar chart
plt.grid(axis='x')
plt.tight_layout()
plt.savefig(fname='out.png', format='png')

subplots

fig, axes = plt.subplots(figsize=(8, 6))
df['pct_80pct_played'].plot(linewidth=2.0, legend=True, zorder=1)
df['pct_all_played'].plot(linewidth=2.0, legend=True, zorder=2)
axes.set_ylim(0, 100)
plt.legend(['prob. 80% played', 'prob. all played'])

Matplotlib

2 Subplots sharing xaxis

import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

# initialize plot (
fig, axes = plt.subplots(nrows=2, ncols=1, sharex=True, figsize=(6, 8)  # default = 6.4,4.8
            ,dpi=100)
fig.suptitle(f"COVID-19 in {long_name}")  # super title
axes[0].set_title("Inzidenzwert und -anstieg", fontsize=10)
axes[1].set_title("Tote und Intensivstationsbelegung", fontsize=10)

# define colors for data
colors = (('blue', 'red'), ('purple', 'green'))

# plot the data of a pandas dataframe
df['Inzidenz'].plot(
    ax=axes[0], color=colors[0][0], legend=False, secondary_y=False, zorder=2, linewidth=2.0)
df['Inzidenzanstieg'].plot.area(
    ax=axes[0], color=colors[0][1], legend=False, secondary_y=True, zorder=1)
df['Tote'].plot(
    ax=axes[1], color=colors[1][0], legend=False, secondary_y=False, zorder=2, linewidth=2.0)
df['Intensivstationsbelegung'].plot.area(
    ax=axes[1], color=colors[1][1], legend=False, secondary_y=True, zorder=1, linewidth=2.0) 

# Axis layout, text and range
# remove label as date is obvious
axes[1].set_xlabel("")

# top plot
axes[0].set_title("Inzidenzwert und -anstieg", fontsize=10)
axes[1].set_title("Tote und Intensivstationsbelegung", fontsize=10)
# axis label
axes[0].set_ylabel('Inzidenz (7 Tage)')
axes[0].right_ax.set_ylabel('Inzidenzanstieg (7 Tage)')
axes[1].set_ylabel('Tote (7 Tage pro Millionen)')
axes[1].right_ax.set_ylabel('Intensivstationen Anteil COVID-Patienten')
# axis range
axes[0].set_ylim(0, )  # 0,550
axes[0].right_ax.set_ylim(0, 150)
axes[1].set_ylim(0, )  # 0,250
axes[1].right_ax.set_ylim(0, 40)
# tick freq
# all are set to make charts better compareable
axes[0].yaxis.set_major_locator(ticker.MultipleLocator(50))
axes[0].right_ax.yaxis.set_major_locator(ticker.MultipleLocator(25))
axes[1].yaxis.set_major_locator(ticker.MultipleLocator(25))
axes[1].right_ax.yaxis.set_major_locator(ticker.MultipleLocator(10))
# tick format
axes[0].yaxis.set_major_formatter(ticker.FormatStrFormatter('%d'))
axes[0].right_ax.yaxis.set_major_formatter(
 ticker.PercentFormatter(decimals=0))
axes[1].yaxis.set_major_formatter(ticker.FormatStrFormatter('%d'))
axes[1].right_ax.yaxis.set_major_formatter(
 ticker.PercentFormatter(decimals=0))
# color of label and ticks
axes[0].yaxis.label.set_color(colors[0][0])
axes[0].tick_params(axis='y', colors=colors[0][0])
axes[0].right_ax.yaxis.label.set_color(colors[0][1])
axes[0].right_ax.tick_params(axis='y', colors=colors[0][1])
axes[1].yaxis.label.set_color(colors[1][0])
axes[1].tick_params(axis='y', colors=colors[1][0])
axes[1].right_ax.yaxis.label.set_color(colors[1][1])
axes[1].right_ax.tick_params(axis='y', colors=colors[1][1])
# zorder problem
# 1. per axis
# 2. per series in axis including grid
# Problem: can't solve the problem, that data of the secondary y axis is plotted below the grid of the 1st axis
axes[0].grid(zorder=-1)
axes[0].set_zorder(axes[0].right_ax.get_zorder()+1)
axes[0].patch.set_visible(False)
axes[1].grid(zorder=-1)
axes[1].set_zorder(axes[1].right_ax.get_zorder()+1)
axes[1].patch.set_visible(False)

more stuff

# using 2nd axis and filled area
ax1 = df.Cases_Last_Week_Per_100000.plot(
      color="blue", legend=False, secondary_y=False, zorder=2)
ax1.set_zorder(2)
# important: transparent background for line plot
ax1.set_facecolor('none')
ax2 = df.Cases_Last_Week_7Day_Percent.plot.area(color="red",
                                                   legend=False, secondary_y=True, zorder=1)
ax2.set_zorder(1)

# set axis range
ax1.set_ylim(0, )
ax2.set_ylim(0, 200)

# set axis label
ax1.set_ylabel('Inzidenz (7 Tage)')
ax2.set_ylabel('Anstieg (7 Tage)')
# no label for x axis
# plt.xlabel("")
ax1.set_xlabel("")
ax2.set_xlabel("")

# tics as percentage
import matplotlib.ticker as mtick
ax2.yaxis.set_major_formatter(mtick.PercentFormatter())

# axis numbers and label colors 
ax1.yaxis.label.set_color('blue')
ax1.tick_params(axis='y', colors='blue')
ax2.yaxis.label.set_color('red')
ax2.tick_params(axis='y', colors='red')

# set locale language setting for date axis etc.
import locale
locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8')

# add text to bottom right
plt.gcf().text(1.0, 0.0, s="by Torben https://entorb.net , based on RKI and DIVI data", fontsize=8,
               horizontalalignment='right', verticalalignment='bottom', rotation='vertical')


GUI via tkinter

import tkinter as tk  # no need to install via pip


class App(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title("robot-CClicker")
        self.geometry("200x200+0+1080")
        self.resizable(width=False, height=False)

        self.l_buttons = []
        self.__create_widgets()

    def __create_widgets(self):
        self.btn_click500 = tk.Button(
            master=self,
            text="500 clicks",
            width=20,
            command=lambda: self.clickBigCookie(500),
        )
        self.l_buttons.append(self.btn_click500)

        for button in self.l_buttons:
            button.pack(anchor=tk.W)

    def clickBigCookie(self, num):
        # TODO: the disabling of the buttons is not working
        for button in self.l_buttons:
            button["state"] = tk.DISABLED
        helper.clickIt(self.posBigCockie[0], self.posBigCockie[1], num=num)
        for button in self.l_buttons:
            button["state"] = tk.NORMAL


if __name__ == "__main__":
    app = App()
    app.mainloop()