Python

From Torben's Wiki


Getting Started

Install

Python

brew install pyenv
pyenv install 3.10.10 
pyenv global 3.10.10
vim ~/.zshrc 
# add 
if command -v pyenv 1>/dev/null 2>&1; then
  eval "$(pyenv init -)"
fi

Editor: Visual Studio Code

excellent and free source-code editor that supports many languages.

See Wickie page Visual_Studio_Code for general setup, extension, config...

Python Extensions

Settings (CTRL + ,)

  • Extensions -> Python -> Formatting: Provider = black
  • Text Editor -> Editor: Format On Save
  • Text Editor -> Files:Eol -> \n
  • Linter: Ruff

Settings in settings.json

"python.analysis.completeFunctionParens": true,
"python.analysis.autoImportCompletions": true,
"python.analysis.inlayHints.functionReturnTypes": true,
"python.analysis.typeCheckingMode": "strict",
// Ruff
"[python]": {
  "editor.defaultFormatter": "charliermarsh.ruff",
  "editor.formatOnSave": true,
  "editor.codeActionsOnSave": {
    "source.fixAll": "explicit",
    "source.organizeImports.ruff": "explicit"
  },
},

Run your code

CTRL + F5 : run
F5        : run in debugger

Code Formatter Ruff

use software for handling the code formatting like "ruff" or "black", where Ruff is much faster and additionally provides linting.

pip install ruff
# or
pip install black

than activate in editor like vs code

My Templates

Standard Template

see header documentation example 1 and example 2 see Google Python Styleguide

#!/usr/bin/env python3.12

# by Torben Menke https://entorb.net

"""
Standard template.
"""

# Built-in/Generic Packages
# import os

# External Packages
# import openpyxl

print("Moin Moin")

Object Oriented Template

#!/usr/bin/env python3.12

# by Torben Menke https://entorb.net

"""Object Oriented Template."""


class MyDevice:

    """
    MyDevice class.
    """

    def __init__(
        self: "MyDevice",
        devicename: str = "",
        *,  # all following arguments are keyword-only
        verbose: bool = False
    ) -> None:
        """Initialize the device."""
        # name of the device (e.g. for log messages)
        self.devicename = devicename
        self.verbose = verbose  # whether to log information or be quiet

    def log(self: "MyDevice", msg: str) -> None:
        """Log a message."""
        print(msg)


class SMU236(MyDevice):

    """
    Device SMU236.
    """

    def __init__(
        self: "SMU236",
        gpibaddress: float,
        devicename: str = "",
        *,  # all following arguments are keyword-only
        verbose: bool = False
    ) -> None:
        """Initialize the device."""
        MyDevice.__init__(self, devicename=devicename, verbose=verbose)
        self.gpibaddress = gpibaddress


if __name__ == "__main__":
    SMU = SMU236(1234)
    SMU.log("Starting")

Compile to .exe

pip install pyinstaller
pyinstaller --onefile --console your.py
# for Excel and Matplotlib these options are required
--hidden-import=openpyxl --hidden-import=matplotlib --hidden-import pandas.plotting._matplotlib 

(Python - py2exe is deprecated)

Basics

Naming Conventions

Google Python Style Guide:

module_name, package_name, ClassName, method_name, ExceptionName, function_name, GLOBAL_CONSTANT_NAME, global_var_name, instance_var_name, function_parameter_name, local_var_name

Installing packages

python -m pip install --upgrade pip

pip install somemodule
# or 
pip3 install somemodule
# or read from file
pip install -r requirements.txt

# uninstall
pip uninstall somemodule

# using a web proxy
# set proxy for windows cmd session
SET HTTPS_PROXY=http://myProxy:8080
(afterwards --proxy setting below no longer required
or
pip install --proxy http://myProxy:8080 somemodule

# list outdated packages
pip list --outdated

# update package
pip install --upgrade pyinstaller

# updating all via Windows Powershell (from [1])
pip freeze | %{$_.split('==')[0]} | %{pip install --upgrade $_}

# updating all via Bash (from [2])
pip freeze | grep -v '^\-e' | cut -d = -f 1  | xargs -n1 pip install --upgrade

# downgrade
pip install --upgrade pandas==1.2.4

generate requirements.txt

pip install pipreqs
pipreqs ./

Loops

ATTENTION: The loops do not create a new variable scope. Only functions and modules introduce a new scope!

for p in all_files:
    print(p)

for i, p in enumerate(all_files):
    print(i, p)

for i in range(10):
    print(i)
# del i 

while i <= 100:
    i += 1
    ...
    if sth1:
        continue # start next loop
    if sth2:
        break # exit loop

# inline if (requires a dummy else):
print("something") if sth else 0

Math

see Python - Math for linear regression

Modulo

15 % 4
# > 3

Integer Division

17 // 4
# > 4

Random

import random
random.randint(1000000, 9999999)

Basic Objects

Variables

del var  # delete / undef a variable
var = None  # sets to null

# check if variable is defined
if "var" in locals():
    pass
# for object oriented projects:
if "var" in self.__dict__:
    pass

# Access global variables in functions
var = 123

def test() -> None:
    global var  # point to global instead of creation of local var
    var = 321

Strings

# num <-> str
s = str(i)  # int to string
f = float(s)  # str -> float
i = int(s)
str(round(f, 1))  # round first
# tests
s.isdigit()  # 0-9
# note isdecimal() does also not match '1.1'

# printf: 1 digit
s = f"{value:0.1f}"
s = "%0.1f" % value

Modify Strings

# get string from prompt
s = input("Enter Text: ")

s = s.strip()  # trim spaces from both sides, rstrip for right only
s = s.lower()  # lower case
s = s.upper()  # upper case
s = s.title()  # upper case for first char of word

# upper case first letter of each word and also removes multiple and trailing spaces
import string
s = string.capwords(s)

# replace
s.replace(x, y)

# trim whitespaces from left and right
s.strip()

# replace all (multiple) whitespaces by single space ' '
s = " ".join(s.split())

# generate key value pairs from dict
# key1=value1&key2=value2
param_str = "&".join("=".join(tup) for tup in dict.items())

# repeat string multiple times
s * 5  # = s+s+s+s+s

Substrings

# find a substring:
if x in s:  # True / False
    pass
if len(s) > 0:
    pass

# handling substrings
a = "abcd"
b = a[:1] + "o" + a[2:]
# > 'aocd'

s = "Hello there !bob@"
i1 = s.find("!") + 1
i2 = s.find("@")
substr = s[i1:i2]

def substr_between(s: str, s1: str, s2: str) -> str:
    assert s1 in s, f"E: can't find '{s1}' in '{s}'"
    assert s2 in s, f"E: can't find '{s1}' in '{s}'"
    i1 = s.find(s1) + len(s1)
    i2 = s.find(s2)
    assert i1 < i2, f"E: '{s1}' not before '{s2}' in '{s}'"
    return s[i1:i2]

Binary, raw strings, html encoding

# Binary Strings
sb = b"asdf"
# or
sb = str.encode("asdf")
s = sb.decode("ascii")  # decode binary strings
sb = s.encode("ascii")  # encode string to binary

# raw string
s = r"c:\Windows"  # no escape of \  needed

# convert utf-8 to html umlaute
s = "Nürnberg".encode("ascii", "xmlcharrefreplace").decode()
# -> Nürnberg

Merge variables in string / sprintf

print("Renner =", i)
print("Renner = %3d" % i)  # leading 0's
print(f"Renner = {i}")

# place formatted numbers in a string / sprintf
s = "The {:.1f}% {} cost {:05.2f} euros".format( 5.1, "beer", 3.50)
print(s)
# > The 5.1% beer cost 03.50 euros

s = f"The length is {72.8958:.2f} meters"
# > The length is 72.90 meters

Lists

like @array in Perl

lst = [1, 2, 3, 4, 5, 6]
lst = [x / 2 for x in range(10)]
lst = [None] * 10 # Initiate list of None elements:
len(lst) # length
lst2 = lst[0:10]  # get elements 0-10
for i in lst:
    print(i)

clone list

# dangerous: creates a link to the original list
list_2 = my_list  # M's elements are LINKS to L's
# clones can be achieved via:
list_2 = my_list.copy
list_2 = my_list[:]
list_2 = list(my_list)

list modifications

lst.pop()  # returns and removes the last item
lst.pop(i)  # returns and removes the item at  position int i
lst.insert(i, x)  # insert item x at position int i
lst.remove(x)  # removes the first occurrence of  item x
lst.append(x)  # append a single element
lst.extend(m)  # append elements of another list

lst.reverse()  # reverse the order of the list
lst = sorted(["B", "a", "c"], key=str.casefold)  #  case insensitive / ignore case

# list to string
s = "".join(lst)
s = "\n".join(lst)
# string to list
lst = s.split()  # default: split on space
lst = s.split(",")

# remove empty values from end
while L[-1] == "":
    L.pop()

# check and remove items from list
 # from https://stackoverflow.com/a/6024599
 # iterates in-situ via reversed index
for i in range(len(lst) - 1, -1, -1):
    element = lst[i]
    if check(element):
        del lst[i]

check if item is in list

x in lst
x not in lst
lst.count(x)  # how many items x are in the list
i = lst.index("word")  # find in list / returns the  position of the first match in list

pair 2 lists

# zip: merge 2 lists to list of tuples
data = list(zip(data_x, data_y, strict=True))

# unzip: split list of pairs into 2 lists
data_x, data_y = zip(*data, strict=True)

# Cartesian product of lists / tuples
import itertools
for i in itertools.product(*list_of_lists):
    print(i)

sort multidim list

lst = sorted(lst, key=lambda x: x[0], reverse=False)
lst = sorted(lst, key=lambda row: (row["Wann"], row ["Wer"]), reverse=False)

filter list

lines = [x for x in lines if not x.startswith ("word")]
lst = ["asdf", "asdf2", "qwertz"]
lst = [elem for elem in lst if "asdf" in elem]

for each element

# trim/strip spaces for each element
lines = [s.strip() for s in lines]
# modify each item in list by adding constant string
l = [s + ';' + v for v in l]
 
# modify item in list
for idx, line in enumerate(cont):
    if "K1001/1" in line:
        line = "K1001/1 Test Nr " + str(i) + "\n"
        cont[idx] = line
        break

Tuples

Ordered sequence, with no ability to replace or delete items

tpl = (1,2,3,4,5,6)

list -> tuple

tpl = tuple(lst)

combine 2 tuples

tpl = tpl1 + tpl2

Dictionaries

like %hash in Perl

d = {"key1": "value1", "key2": 123}
d["key3"] = (1, 2, 3)
del d["key3"]

len(d)
d.clear()
d.copy()
d.keys()
d.values()
d.items()  # returns a list of tuples (key, value)
d.get(k)  # returns value of key k
d.get(k, x)  # returns value of key k; if k is not  in d it returns x
count = d.get("key", 0) + 1 # nice for counters
d.pop(k)  # returns and removes item k
d.pop(k, x)  # returns and removes item k; if k is  not in d it returns x
# checks
x in d
x not in d

# dict can have tuple as key
tpl = ("key1", "key2", 123)
d[tpl] = 123456

# loop over all key-value pairs
for key, value in d.items():
    print(f"{key} = {value}")

# sorted keys:
for key in sorted(dict.keys()):
    pass
# sorted values, reversed
for key, value in sorted(d.items(), key=lambda item:  item[1], reverse=True):
    pass

# join / merge 2 dicts
d.update(d2)

# flatten dict of dict
flat = d.copy()
meta = d.pop("metadata")
flat.update(meta)

MultiDim Dictionaries

d = {}
d["item1"] = {}
d["item1"]["value"] = 1.909e18
d["item1"]["name"] = "Item 1"
d["item2"] = {}
d["item2"]["value"] = 1.725e18
d["item2"]["name"] = "Item 2"

for k in d.keys():
    d[k]["value"] = d[k]["value"] * 1.1

Datetime, Date and Time

import datetime as dt
from zoneinfo import ZoneInfo
TZ_DE = ZoneInfo("Europe/Berlin")
TZ_UTC = dt.timezone.utc

Create

# date
date = dt.date(2023, 12, 31)
date = dt.date.fromisocalendar(int(year), int(week),  int(daynum))  # daynum: 1..7
date = dt.date.fromisoformat("2020-03-10")
date = dt.datetime.strptime(datestr, "%y%m%d").date()
date_today = dt.datetime.now(tz=TZ_DE).date()
date_yesterday = dt.datetime.now(tz=TZ_DE).date() -  dt.timedelta(days=1)
days_overdue = (date_completed - date_due).days
# to add one month (which is not supported by timedelta) 
from dateutil.relativedelta import relativedelta
date_end = date_start + relativedelta(months=1)

# datetime
dt_now = dt.datetime.now(tz=TZ_DE)
my_dt = dt.datetime(2023, 12, 31, 14, 31, 56,  tzinfo=TZ_DE)  # 2023-12-31 14:31:56
my_dt = dt.datetime.fromtimestamp(my_timestamp,  tz=TZ_DE)
my_dt = dt.datetime.fromisoformat ("2017-01-01T12:30:59.000000")
my_dt = dt.datetime.fromisoformat("2020-03-10  06:01:01+00:00")
s = "2020-03-10T06:01:01Z"
my_dt = dt.datetime.fromisoformat(s.replace("Z", " +00:00"))
my_date_local = my_dt.astimezone(tz=TZ_DE).date()

# datetime -> date
my_date = my_dt.date()
# date -> datetime
my_date = dt.date(2023, 12, 31)
my_dt = dt.datetime(my_date.year, my_date.month,  my_date.day, tzinfo=TZ_DE)

# to string
datestr = my_dt.strftime("%y%m%d")
datestr = my_dt.strftime("%Y-%m-%d %H:%M:%S")
# alternative using time
datastr = time.strftime("%Y-%m-%d %H:%M:%S")
# now in UTC without milliseconds
datestr = dt.datetime.now(tz=dt.timezone.utc).replace(microsecond=0).isoformat() + "Z"

# German format
import locale
locale.setlocale(locale.LC_ALL, "de_DE")
print(my_date.strftime("%a %x"))  # Mo 25.12.2023

# Calendar week
week = my_date.isocalendar()[1]
print("KW%02d" % week)

# first day of quarter
def get_first_day_of_the_quarter(my_date: dt.date) -> dt.date:
    return dt.date(my_date.year, 1 + 3 * ((my_date.month - 1) // 3), 1)

rounding datetimes to minutes

def floor_dt_minutes(my_dt: dt.datetime, res: int =  5) -> dt.datetime:
    """Floor (=round down) minutes to X min  resolution."""
    min_new = res * (my_dt.minute // res)
    return my_dt.replace(minute=min_new, second=0,  microsecond=0)

def ceil_dt_minutes(my_dt: dt.datetime, res: int =  5) -> dt.datetime:
   """Ceil (=round up) minutes to X min resolution."""
   min_new = res * (1 + my_dt.minute // res)
   return my_dt.replace(minute=0, second=0, microsecond=0) + dt.timedelta(
       minutes=min_new
   )

def round_dt_minutes(my_dt: dt.datetime, res: int =  5) -> dt.datetime:
   """Cound minutes to X min resolution."""
   min_old_dec = float(my_dt.minute) + float(my_dt.second * 60)
   min_new = res * round(min_old_dec / res)
   return my_dt.replace(minute=0, second=0, microsecond=0) + dt.timedelta(
       minutes=min_new
   )

my_dt = dt.datetime.fromisoformat("2011-11-04  00:05:23+00:00")
print(f"original: {my_dt}")
print(f"floored: {floor_dt_minutes(my_dt,5)}")
print(f"ceileded: {ceil_dt_minutes(my_dt,5)}")
print(f"rounded: {round_dt_minutes(my_dt,5)}")

Timing / Time elapsed

import time 
timestart = time.time()
sec = time.time() - timestart
print(f"Time elapsed: {sec:.2f} sec")

OS, Argpars, etc.

Hostname

from socket import gethostname
print(gethostname())

Checking Operating System

import os
import sys
 
if os.name == "posix":
    print("posix/Unix/Linux")
elif os.name == "nt":
    print("windows")
else:
    print("unknown os")
    sys.exit(1)  # throws exception, use quit() to   close / die silently

Get filename of python script

my_file_path = __file__

alternative using sys package

from sys import argv
myFilename = argv[0]

accessing os envrionment variables

import os
print(os.getenv("tmp"))
# better:
try:
    s = os.environ[key] # throws error if unset
except KeyError:
    error_message(f"Environment variable {key} not set.")

Command Line Arguments

ArgumentParser

import argparse

parser = (
    argparse.ArgumentParser()
)  # construct the argument parser and parse the  arguments
# -h comes automatically

# Boolean Parameter
parser.add_argument(
    "-v", "--verbose", help="increase output  verbosity", action="store_true"
)  # store_true -> Boolean Value

# Choice Parameter
# restrict to a list of possible values / choices
# parser.add_argument("--choice", type=int, choices= [0, 1, 2], help="Test choices")

# Positional Parameter (like text.py 123)
# parser.add_argument("num", type=int, help="Number  of things")

# Required Parameter
# parser.add_argument("-i", "--input", type=str,  required=True, help="Path of file")

# Optional Parameter
parser.add_argument("-n", "--number", type=int,  help="Number of clicks")
# Optional Parameter with Default
parser.add_argument(
    "-s",
    "--seconds",
    type=int,
    default=sec_default,
    help="Duration of clicking, default = %i (sec)"  % sec_default,
)

args = vars(parser.parse_args())

if args["verbose"]:
    pass  # do nothing
# print ("verbosity turned on")
if args["number"]:
    print("num=%i" % args["number"])

match case statement

(new in python 3.10)

match args:
  case {"sap": "prod", "version": 1}:
    ...
  case {"sap": "prod", "version": 2}:
    ...
  case _: # default case

File Access

Pathlib

for migrating see table [3]

from pathlib import Path

p = Path("dir/test.txt")

log_file = Path(__file__).with_suffix(".log")  # __file__ is name of python script

my_fname = p.name  #  alternative to basename
my_ext = p.suffix
(filepath_without_ext, file_ext) = (p.stem, p.suffix)
my_dir = p.parent
my_parent_dir = p.parents[1]
p2 = p.with_suffix(".json")
p3 = p.parent / (p.name + "-autofix.tex")

# checks
Path(my_file_str).exists()
Path(my_file_str).is_file()
Path(my_file_str).is_dir()

# loop over glob of files matching wildcard
for file_out in Path("mydir").glob("*-autofix.tex"):

# loop over dirs
p = Path() # cwd
list_of_repos = [x for x in p.iterdir() if x.is_dir() and (x / ".git").is_dir()]


alternative using old os functions Split path into folder, filename, ext

import os
(dirName, fileName) = os.path.split(f)
(fileBaseName, fileExtension) = os.path.splitext(fileName)
fileOut = os.path.splitext(fileIn)[0] + "-out.txt"

File Manipulations: copy, move, delete, touch

from pathlib import Path

# copy
from shutil import copyfile
copyfile(Path("file-source.txt"), Path("dir") / Path("file-target.txt"))

# move / rename
Path("file.txt").replace(Path("file2.txt"))

# delete
Path("file.txt").unlink()

# touch
Path("file.txt").touch()

File size and timestamp

# size
Path("file.txt").stat().st_size

# timestamp last modified
Path("file.txt").stat().st_mtime

Dir create/mkdir and delete

# create dir
from pathlib import Path
Path("myDir1/myDir2").mkdir(parents=True, exist_ok=True)
# delete dir
import shutil
shutil.rmtree(d)

Loop over Directories

Fetch Dir Contents / Loop over Files

glob via pathlib

for fileOut in Path("mydir").glob("*-autofix.tex"):

Get list of files in directory, filter dirs from list, filter by ext

dir= "/path/to/some/dir"
listoffiles = [ f for f in os.listdir(dir) if os.path.isfile(os.path.join(dir ,f)) and f.lower()[-4:] == ".gpx"]
listoffiles.sort()

Traverse in Subdirs

# walk into path an fetch all files matching extension jpe?g
files = []
for (dirpath, dirnames, filenames) in os.walk("."):
    dirpath = dirpath.replace("\\", "/")
    for file in filenames:
        if file.endswith(".txt"):
            files.append(dirpath + "/" + file)
        elif re.search(r"\.jpe?g$", file, re.IGNORECASE):
            files.append(dirpath + "/" + file)

File Parsing

File General

File Read

path_file_in = Path("file.txt")

# via pathlib
cont = path_file_in.read_text(encoding="utf-8") # note: utf-8-sig for UTF-8-BOM

# general
with path_file_in.open(encoding="utf-8") as fh:
    cont = fh.read()
    # or
    lst = fh.readlines()
    # or
    line = fh.readline()
    # or
    for line in fh:
        print(line)

fh = path_file_in.open(encoding="utf-8")
...
fh.close()

File Write

path_file_out = Path("out/1/out.txt")

# write via pathlib
path_file_out.write_text("some text")

# write general
with path_file_out.open(mode="w", encoding="utf-8", newline="\n") as fh:
    # w = overWrite file ; a = append to file
    # If running Python in Windows, "\n" is automatically replaced by "\r\n".
    # To prevent this use newline='\n'
    fh.writelines(lst)  # no linebreaks
    # or
    fh.write("\n".join(lst))
    # or
    for line in lst:
        fh.write(line)

    # Force update of file contents without closing it
    fh.flush()

# alternative
fh = path_file_out.open(mode="w", encoding="utf-8", newline="\n")
...
fh.close()

CSV

CSV Read

import csv
from pathlib import Path

with Path("data.csv").open(mode="r", encoding="utf-8") as fh:  # for Excel use ANSI
    csv_reader = csv.DictReader(fh, dialect="excel", delimiter="\t")
    for row in csv_reader:
        print(f'\t{row["name"]} works in the {row["department"]} department')

CSV Write

import csv
from pathlib import Path

# simple
with Path("data.tsv").open(mode="w", encoding="utf-8", newline="\n") as fh:
    csvwriter = csv.writer(fh, delimiter="\t")
    csvwriter.writerow(("Date", "Confirmed"))

# dictwriter
with Path("data.tsv").open(mode="w", encoding="utf-8", newline="\n") as fh:
    csvwriter = csv.DictWriter(
        fh,
        delimiter="\t",
        extrasaction="ignore",
        fieldnames=["date", "occupied_percent", "occupied", "total"],
    )
    csvwriter.writeheader()
    for d in my_list_of_dicts:
        d["occupied_percent"] = round(100 * d["occupied"] / d["total"], 1)
        csvwriter.writerow(d)

JSON

JSON Read

import json
# from file
with path_to_download_file.open(encoding="utf-8") as fh:
    d_json = json.load(fh)
# from string
d_json = json.loads(response_text)
def json_read(file_path: Path) -> list[dict[str, str]]:
    """
    Read JSON data from file.
    """
    with file_path.open(encoding="utf-8") as fh:
        json_data = json.load(fh)
    return json_data

JSON Write

Write dict to file in JSON format, keeping utf-8 encoding

import json

with path_to_download_file.open(mode="w", encoding="utf-8", newline="\n") as fh:
    json.dump(my_dict, fh, ensure_ascii=False, sort_keys=False, indent=2)
def json_write(file_path: Path, json_data: list[dict[str, str]]) -> None:
    """
    Write JSON data to file.
    """
    with file_path.open("w", encoding="utf-8", newline="\n") as fh:
        json.dump(json_data, fh, ensure_ascii=False, sort_keys=False, indent=2)

Excel

Excel Read

import openpyxl

workbook = openpyxl.load_workbook(
    pathToMyExcelFile,
    data_only=True,  # read values instead of formulas
    read_only=True,  # suppresses: "UserWarning: wmf image format is not supported so the image is being dropped"
)
sheet = workbook["mySheetName"]
# or fetch active sheet
sheet = workbook.active
cell = sheet["A34"]
# or
cell = sheet.cell(row=34, column=1)  # index start here with 1
print(cell.value)
# or
print(sheet.cell(column=col, row=row).value)

Excel Write

import openpyxl

workbook = openpyxl.Workbook()
sheet = workbook.active
cell = sheet["A34"]
# or
cell = sheet.cell(row=i, column=j)  # index starts at 1
cell.value = "asdf"
workbook.save("out.xlsx")

Regular Expressions

See [4]

See [5] for an online tester

multiple flags are joined via pipe |

s = re.sub("asdf.*", r"qwertz", s, flags=re.DOTALL | re.IGNORECASE)

Lookahead and Lookbehind

pos lookahead: (?=...)
neg lookahead: (?!...)
pos lookbehind (?<=...)
neg lookbehind (?<!...)

matching

import re

# V0: simple 1
myPattern = "(/\*\*\* 0097_210000_0192539580000_2898977_0050 \*\*\*/.*?)($|/\*\*\*)"
myRegExp = re.compile(myPattern, re.DOTALL)
myMatch = myRegExp.search(cont)
assert myMatch != None, f"golden file not found in file {filename}"
cont_golden = myMatch.group(1)

# V1: simple 2
assert (
    re.match("^[a-z]{2}$", d_settings["country"]) != None
), f'Error: county must be 2 digit lower case. We got: {d_settings["country"]}'

# V2: find and count
was = r"""Part: ([^<+])"""
cnt_parts = len(re.findall(was, cont))
cont = re.sub(was, r"\n\nTeil: \1\n\n", cont)
assert cnt_parts == 4, f"{cnt_parts} == 4"
assert "Part:" not in cont
Match email
def checkValidEMail(email: str) -> bool:
    # from https://stackoverflow.com/posts/719543/timeline bottom edit
    if not re.fullmatch(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$", email):
        print("Error: invalid email")
        quit()
    return True

Find all

myMatches = re.findall('href="([^"]+)"', cont)
for myMatch in myMatches:
    print(myMatch)

substring

import re

# simple via search
lk_id = re.search('^.*timeseries\-(\d+)\.json$', f).group(1)

# simple via sub
myPattern = "^.*" + s1 + "(.*)" + s2 + ".*$"
out = re.sub(myPattern, r"\1", s)

# more robust including an assert
def substr_between(s: str, s1: str, s2: str) -> str:
    """
    returns substring of s, found between strings s1 and s2
    s1 and s2 can be regular expressions
    """
    myPattern = s1 + '(.*)' + s2
    myRegExp = re.compile(myPattern)
    myMatches = myRegExp.search(s)
    assert myMatches != None, f"E: can't find '{s1}'...'{s2}' in '{s}'"
    out = myMatches.group(1)
    return out
matchObj = re.search(r"(\d+\.\d+)", text
if matchObj:
  price = float( '%s' % (matchObj).group(0) )

Naming of match groups

(?P<name>...), see [6]

Search and Replace

From [7]
re.sub(regex, replacement, str) performs a search-and-replace across subject, replacing all matches of regex in str with replacement. The result is returned by the sub() function. The str string you pass is not modified.

s = re.sub("  +", " ", s)

Splitting

From [8]
split() splits a string into a list delimited by the passed pattern. The method is invaluable for converting textual data into data structures that can be easily read and modified by Python as demonstrated in the following example that creates a phonebook.

First, here is the input. Normally it may come from a file, here we are using triple-quoted string syntax:

>>> input = """Ross McFluff: 834.345.1254 155 Elm Street
...
... Ronald Heathmore: 892.345.3428 436 Finley Avenue
... Frank Burger: 925.541.7625 662 South Dogwood Way
...
...
... Heather Albrecht: 548.326.4584 919 Park Place"""

The entries are separated by one or more newlines. Now we convert the string into a list with each nonempty line having its own entry:

>>> entries = re.split("\n+", input)
>>> entries
['Ross McFluff: 834.345.1254 155 Elm Street',
'Ronald Heathmore: 892.345.3428 436 Finley Avenue',
'Frank Burger: 925.541.7625 662 South Dogwood Way',
'Heather Albrecht: 548.326.4584 919 Park Place']

Finally, split each entry into a list with first name, last name, telephone number, and address. We use the maxsplit parameter of split() because the address has spaces, our splitting pattern, in it:

>>> [re.split(":? ", entry, 3) for entry in entries]
[['Ross', 'McFluff', '834.345.1254', '155 Elm Street'],
['Ronald', 'Heathmore', '892.345.3428', '436 Finley Avenue'],
['Frank', 'Burger', '925.541.7625', '662 South Dogwood Way'],
['Heather', 'Albrecht', '548.326.4584', '919 Park Place']]

replace cont by linebreaks

def replace_cont_by_linebreaks(s: str, regex: str) -> str:
    """
    Replace regex in s by the number of linebreaks it originally contained.
    """
    myMatches = re.findall(regex, s, flags=re.DOTALL)
    for match in myMatches:
        linebreaks = match.count("\n")
        s = s.replace(match, "\n" * linebreaks, 1)
    return s


Image/Picture/Photo

from PIL import Image, ImageFilter  # pip install Pillow

fileIn = "2018-02-09 13.56.25.jpg"
# Read image
img = Image.open(fileIn)

PROBLEM:
PIL Image.save() drops the IPTC data like tags, keywords, copywrite, ...
better using https://imagemagick.org instead when tags shall be kept

Resize

# Resize keeping aspect ration -> img.thumbnail
# drops exif data, exif can be added from source file via exif= in save, see below
size = 1920, 1920
img.thumbnail(size, Image.ANTIALIAS)

Export file

fileOut = os.path.splitext(fileIn)[0] + "-edit.jpg"
try:
    img = Image.open(fileIn)
    img.save(fp=fileOut, format="JPEG", quality='keep')  # exif=dict_exif_bytes
    # JPEG Parameters
    # * qualitiy : 'keep' or 1 (worst) to 95 (best), default = 75. Values above 95 should be avoided.
    # * dpi : tuple of integers representing the pixel density, (x,y)
except IOError:
    print("cannot write file '%s'" % fileOut)

Export Progressive / web optimized JPEG

from PIL import ImageFile  # for MAXBLOCK for progressive export
fileOut = os.path.splitext(fileIn)[0] + "-progressive.jpg"
try:
    img.save(fp=fileOut, format="JPEG", quality=80, optimize=True, progressive=True)
except IOError:
    ImageFile.MAXBLOCK = img.size[0] * img.size[1]
    img.save(fp=fileOut, format="JPEG", quality=80, optimize=True, progressive=True)

JPEG Meta Data: EXIF and IPTC

IPTC: Tags/Keywords
from iptcinfo3 import IPTCInfo  # this works in pyhton 3!
iptc = IPTCInfo(fileIn)
if len(iptc['keywords']) > 0:  # or supplementalCategories or contacts
    print('====> Keywords')
    for key in sorted(iptc['keywords']):
        s = key.decode('ascii')  # decode binary strings
        print(s)
EXIF via piexif
import piexif  # pip install piexif
exif_dict = piexif.load(img.info['exif'])
print(exif_dict['GPS'][piexif.GPSIFD.GPSAltitude])
# returns list of 2 integers: value and donator  -> v / d
# (340000, 1000) => 340m
# (51, 2) => 25.5m

# Modify altitude
exif_dict['GPS'][piexif.GPSIFD.GPSAltitude] = (140, 1)  # 140m

# write to file
exif_bytes = piexif.dump(exif_dict)
fileOut = os.path.splitext(fileIn)[0] + "-modExif.jpg"
try:
    img.save(fp=fileOut, format="jpeg", exif=exif_bytes, quality='keep')
except IOError:
    print("cannot write file '%s'" % fileOut)

or

exif_dict = piexif.load(fileIn)
for ifd in ("0th", "Exif", "GPS", "1st"):
    print("===" + ifd)
    for tag in exif_dict[ifd]:
        print(piexif.TAGS[ifd][tag]["name"], "\t",
              tag, "\t", exif_dict[ifd][tag])
print(exif_dict['0th'][306]) # 306 = DateTime
EXIF via exifread
# Open image file for reading (binary mode)
fh = open(fileIn, "rb")
# Return Exif tags
exif = exifread.process_file(fh)
fh.close()
# for tag in exif.keys():
#     if tag not in ('JPEGThumbnail', 'TIFFThumbnail', 'Filename', 'EXIF MakerNote'):
#         print("%s\t%s" % (tag, exif[tag]))
print(exif["Image DateTime"])
print(exif["GPS GPSLatitude"])
print(exif["GPS GPSLongitude"])
EXIF GPS via PIL
# from https://developer.here.com/blog/getting-started-with-geocoding-exif-image-metadata-in-python3
def get_exif(filename):
    image = Image.open(filename)
    image.verify()
    image.close()
    return image._getexif()


def get_labeled_exif(exif):
    labeled = {}
    for (key, val) in exif.items():
        labeled[TAGS.get(key)] = val
    return labeled


def get_geotagging(exif):
    if not exif:
        raise ValueError("No EXIF metadata found")
    geotagging = {}
    for (idx, tag) in TAGS.items():
        if tag == "GPSInfo":
            if idx not in exif:
                raise ValueError("No EXIF geotagging found")
            for (key, val) in GPSTAGS.items():
                if key in exif[idx]:
                    geotagging[val] = exif[idx][key]
    return geotagging


def get_decimal_from_dms(dms, ref):
    degrees = dms[0][0] / dms[0][1]
    minutes = dms[1][0] / dms[1][1] / 60.0
    seconds = dms[2][0] / dms[2][1] / 3600.0
    if ref in ["S", "W"]:
        degrees = -degrees
        minutes = -minutes
        seconds = -seconds
    return round(degrees + minutes + seconds, 5)


def get_coordinates(geotags):
    lat = get_decimal_from_dms(geotags["GPSLatitude"], geotags["GPSLatitudeRef"])
    lon = get_decimal_from_dms(geotags["GPSLongitude"], geotags["GPSLongitudeRef"])
    return (lat, lon)


exif = get_exif(fileIn)
exif_labeled = get_labeled_exif(exif)
print(exif_labeled["DateTime"])

geotags = get_geotagging(exif)
print(get_coordinates(geotags))

Template Matching / Image Regocnition Using CV2 / OpenCV

see Python - CV2

Optical Character Recognition (OCR)

see Python - OCR


Templates/Snippets

Diff of 2 files

import difflib

fh1 = p_file1.open("r", encoding="utf-8")
fh2 = p_file1.open("r", encoding="utf-8")
diff = difflib.ndiff(fh1.readlines(), fh2.readlines())
delta = "".join(line for line in diff if line.startswith(("+ ", "- ")))
print(delta)

GPX parsing

import gpxpy
import gpxpy.gpx
# Elevation data by NASA: see lib at https://github.com/tkrajina/srtm.py
fh_gpx_file = open(gpx_file_path, 'r')
gpx = gpxpy.parse(fh_gpx_file)
#  Loops for accessing the data
for track in gpx.tracks:
    for segment in track.segments:
        for point in segment.points:
for waypoint in gpx.waypoints:
for route in gpx.routes:
    for point in route.points: 
# interesting properties of point / waypoint objects:
point.time
point.latitude
point.longitude
point.source
waypoint.name

TypeHints

from typing import Any, Dict, cast
creds = cast(dict[str, str], tomllib.load(f))  # type: ignore
o = cast(Dict[str, Any], tomllib.load(f))  # type: ignore
o["sap"] = cast(Dict[str, str], o["sap"])
o["settings"] = cast(Dict[str, str | int | bool], o["settings"])
o["settings"]["sleep_time"] = cast(int, o["settings"]["sleep_time"])

Pylance/Pyright

import tomllib
# shows warning: Import "xyz" could not be resolved
# fix by 
import tomllib # pyright: ignore

asserts function argument validation

aus Python Kurs von Carsten Knoll

def eine_funktion(satz, ganzzahl, zahl2, liste):
  if not type(satz) == str:
    print "Datentpyfehler: satz"
    return -1
  if not isinstance(ganzzahl, int):
    print "Datentpyfehler: ganzzahl"
    return -2
  if not isinstance(liste, (tuple, list)):
    print "Datentpyfehler: liste"
    return -3
  # Kompakteste Variante (empfohlen): 
  assert zahl2 > 0, "Error: zahl2 ist nicht > 0" # Assertation-Error bei Nichterfuellung
def F(x):
  if not isinstance(x, (float, int)):
    msg = "Zahl erwartet, %s bekommen" % type(x)
    raise ValueError(msg)
  return x**2

better:

def F(x):
  assert isinstance(x, (float, int)), "Error: x is not of type float or int"
  return x**2
assert variant in [
    "normal",
    "gray",
    "cannyedge",
], "Error: variant is not in 'normal', 'gray', 'cannyedge'"

Exceptions

see [9]

Catching

Catch keyboard interrupt and do a "save exit"

try:
    i = 0
    while 1:
        i += 1
        print(i)
except KeyboardInterrupt:
    print("stopped")

Catch all exceptions

try:
  [...]
except Exception as e:
    print("Exception: ", e)

Raising Exceptions

if resp.status_code != 200:  # noqa: PLR2004
    msg = f"Bad response. status code:{resp.status_code}, text:\n{resp.text}"
    raise ValueError(msg) from None

Custom Exceptions

try: 
  raise Exception("HiHo")
raise ValueError¶

perl grep and map

from [10]

def grep(list, pattern):
    expr = re.compile(pattern)
    return [elem for elem in list if expr.match(elem)]
or
filteredList = filter(lambda x: x < 7 and x > 2, unfilteredList)


def map(list, was, womit):
    return list(map(lambda i: re.sub(was, womit, i), list))
    # was = '.*"(\d+)".*'
    # womit = r"\1"

unit testing using pytest

install via

pip install pytest

activate in vscode, see [11]: To enable testing, use the Python: Configure Tests command on the Command Palette.

see https://docs.pytest.org/en/6.2.x/assert.html#assert

test_1.py:

import myLib # my custom lib to test

class TestClass:
    def test_one(self):
        x = "this"
        assert "h" in x

    def test_two(self):
        assert myLib.multiply(1, 2) == 2

    def test_three(self):
        assert myLib.multiply(2, 2) == 2

project structure

simplest structore: test next to script

Alternative without tests dir:

./main.py
./helper.py 
./helper_test.py

and

from helper import fnc1
standalone model with tests dir

if the test files are placed inside a ./tests/ dir

./main.py
./helper.py 
tests/helper_test.py

in order to run the helper_test.py via python (not not only via pytest) you need to add

sys.path.insert(0, Path(__file__).parent.parent.as_posix()
from helper import fnc1

parametrize

import pytest
@pytest.mark.parametrize(
    ("test_input", "expected"),
    [("", None), ("PT30M", 30), ("PT3H", 3 * 60), ("PT2H30M", 2 * 60 + 30)],
)
def test_task_est_to_minutes(test_input: str, expected: int) -> None:
    assert task_est_to_minutes(test_input) == expected
fixures: prepare and cleanup test_data

to automatically run before and after each testcase

@pytest.fixture(autouse=True)
def _setup_tests():  # noqa: ANN202
    cache_prepare_lists()
    cache_prepare_tasks()

    yield

    cache_cleanup_test_data()

Test coverage report

pip install pytest-cov
# console output
pytest --cov

# html report in coverage_report/index.html with details per file
pytest --cov --cov-report=html:coverage_report

to exclude a code block from coverage report, add

# pragma: no cover

Sleep / Wait for input

sleep for a while

import time
time.sleep(60)

wait for user input

input("press Enter to close")

Suppress Warnings

see [12]

import warnings
warnings.filterwarnings("ignore", message=".*native_field_num.*not found in message.*")


Logging

V3 simple logging

# main file:
import logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s\t%(levelname)s\t%(name)s\t%(message)s",
    handlers=[
        RotatingFileHandler(
            Path(__file__).with_suffix(".log"),
            maxBytes=10485760,  # 10 MB = 10*1024*1024,
            backupCount=1,
            encoding="utf-8",
        )
    ],
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(Path(__file__).stem)
logger.info("Start")

# other files:
import logging
logger = logging.getLogger(__name__)

V2: rotating file and STDOUT

# 1. setup
import logging
from logging.handlers import RotatingFileHandler

logfile = "myApp.log"
maxBytes = 20 * 1024 * 1024
backupCount = 5
loglevel_console = logging.INFO
loglevel_file = logging.DEBUG

# create logger
logger = logging.getLogger("root")
logger.setLevel(loglevel_file)

# console handler
ch = logging.StreamHandler()
ch.setLevel(loglevel_console)

# rotating file handler
# fh = logging.FileHandler(logfile)
fh = RotatingFileHandler(logfile, maxBytes=maxBytes, backupCount=backupCount)
fh.setLevel(loglevel_file)

# create formatter and add it to the handlers
# %(name)s = LoggerName, %(threadName)s = TreadName
formatter = logging.Formatter(
    "%(asctime)s - %(levelname)s - %(name)s - %(threadName)s - %(message)s "
)
fh.setFormatter(formatter)
ch.setFormatter(formatter)

# add the handlers to the logger
logger.addHandler(ch)
logger.addHandler(fh)

logger.debug("DebugMe")
logger.info("Starting")
logger.warning("Attention")
logger.error("Something went wrong")
logger.critical("Something seriously went wrong ")

# 2. in other files/modules now use
import logging

logger = logging.getLogger(__name__)
logger.info("text")

...
except Exception as e:
    logger.exception("Unhandeled exception")
    quit()

V1: Simple

import logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s\t%(levelname)s\t%(message)s",  # \t%(name)s
    filename=Path(__file__).with_suffix(".log"), # uncomment to switch to stdout
    filemode="a",
)
logger = logging.getLogger(__name__)
logger.debug("Some text")
logger.info("Some text")
logger.warning("Some text")
logger.error("Some text")
logger.critical("Some text")
try:
...
except psycopg2.DatabaseError:
    logger.exception("Database connection error: %s")
    raise

Process Bar

see tqdm

from tqdm import tqdm
for i in tqdm(range(10000)):
    ....

CGI Web development

# Print necessary headers.
print("Content-Type: text/html")
print()

# errors and debugging info to browser
import cgitb
cgitb.enable()


Access URL or Form Parameters

# V2 from https://www.tutorialspoint.com/python/python_cgi_programming.htm
import cgi
form = cgi.FieldStorage()
username = form.getvalue('username')
print(username)
# V1
import sys
import urllib.parse
query = os.environ.get('QUERY_STRING')
query = urllib.parse.unquote(query, errors="surrogateescape")
d = dict(qc.split("=") for qc in query.split("&"))
print(d)

CGI Backend Returning JSONs

#!/usr/local/bin/python3.6
# -*- coding: utf-8 -*-

import cgi
import json

# Print necessary headers.
print("Content-type: application/json")
print()

def get_form_parameter(para: str) -> str:
    "asserts that a given parameter is set and returns its value"
    value = form.getvalue(para)
    assert value, f"Error: parameter {para} missing"
    assert value != "", f"Error: parameter {para} missing"
    return value
 
response = {}
response['status'] = "ok"

try:
    action = get_form_parameter("action")
    response['action'] = action
    if action == "myAction":
        ...

except Exception as e:
    response['status'] = "error"
    d = {"type": str(type(e)), "text": str(e)}
    response["exception"] = d

finally:
    print(json.dumps(response))

Databases

PostgreSQL

Improved helper function

"""Helper functions for DB access."""

import datetime as dt
import logging

import psycopg2
import psycopg2.extras
from creds_db import credentials_DRM_P01 as creds_db

# Configure logging
logger = logging.getLogger(__name__)

connection: psycopg2.extensions.connection = None  # type: ignore
cursor_query: psycopg2.extensions.cursor = None  # type: ignore
cursor_write: psycopg2.extensions.cursor = None  # type: ignore


def connect() -> (
    tuple[
        psycopg2.extensions.connection,
        psycopg2.extensions.cursor,
        psycopg2.extensions.cursor,
    ]
):
    """Connect to the database."""
    try:
        connection = psycopg2.connect(**creds_db)
        cursor_query = connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
        cursor_write = connection.cursor()
        logger.info(
            "Connected to database %s on host %s",
            creds_db["database"],
            creds_db["host"],
        )
    except psycopg2.DatabaseError:
        logger.exception("Database connection error: %s")
        raise
    else:
        return connection, cursor_query, cursor_write


def disconnect() -> None:
    """Disconnect from the database."""
    try:
        if cursor_query:
            cursor_query.close()
        if cursor_write:
            cursor_write.close()
        if connection:
            connection.close()
        logger.info("Disconnected from the database")
    except psycopg2.DatabaseError:
        logger.exception("Error during disconnection: %s")


def exists(url: str, valid_hours: float) -> bool | None:
    """
    Check if a record exists.

    Returns
    -------
    None for missing record
    True for valid record
    False for expired record

    """
    delete_at = dt.datetime.now(tz=dt.UTC) + dt.timedelta(hours=valid_hours)
    sql = """
SELECT delete_at > %s AS "valid"
FROM my_table
WHERE url = %s LIMIT 1;
"""
    cursor_query.execute(sql, (delete_at, url))
    res = cursor_query.fetchone()
    return res[0] if res else None


def query1(url: str) -> dict | None:
    """Execute a query."""
    sql = """
SELECT *
FROM my_table
WHERE url = %s LIMIT 1
    """
    cursor_query.execute(sql, (url,))
    return cursor_query.fetchone()  # type: ignore


def insert(url: str, response: str, delete_in_hours: float) -> None:
    """Insert a record."""
    delete_at = dt.datetime.now(tz=dt.UTC) + dt.timedelta(hours=delete_in_hours)
    sql = """
INSERT INTO my_table (url, response, delete_at)
VALUES (%s, %s, %s);
    """
    cursor_write.execute(sql, (url, response, delete_at))
    connection.commit()


def delete(url: str) -> None:
    """Delete a record."""
    sql = """
DELETE FROM my_table
WHERE url = %s;
"""
    cursor_write.execute(sql, (url,))
    connection.commit()


def delete_expired(in_hours: float) -> None:
    """Delete records that expire in less than in_hours from now."""
    delete_at = dt.datetime.now(tz=dt.UTC) + dt.timedelta(hours=in_hours)

    sql = """
DELETE FROM my_table
WHERE delete_at < %s;
"""
    cursor_write.execute(sql, (delete_at,))
    connection.commit()


connection, cursor_query, cursor_write = connect()


Basics

import psycopg2
import psycopg2.extras

credentials = {
    "host": "localhost",
    "port": 5432,
    "database": "myDB",
    "user": "myUser",
    "password": "myPwd",
}
connection = psycopg2.connect(**credentials)
cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor)

l_bind_vars = [["A1", "A2"], ["B1", "B2"]]

sql = """
SELECT * FROM myTable
WHERE 1=1 
AND status NOT IN ('CLOSED') 
AND ColA = %s 
AND ColB = %s 
ORDER BY created DESC
"""
cursor.execute(sql, l_bind_vars)
d_data = dict(cursor.fetchone())

export result to csv file

sql1 = "SELECT * FROM table"
sql2 = "COPY (" + sql1 + ") TO STDOUT WITH CSV HEADER DELIMITER '\t'"
        with open("out.csv", "w") as file:
            cursor.copy_expert(sql2, file)

ReadConfigFile to connect to PostgreSQL

from [13] database.ini

[postgresql]
host=dbhost
port=5432
database=dbname
user=dbuser
password=dbpass
from configparser import ConfigParser


def config(filename="database.ini", section="postgresql"):
    parser = ConfigParser()
    parser.read(filename)
    # get section, default to postgresql
    db = {}
    if parser.has_section(section):
        params = parser.items(section)
        for param in params:
            db[param[0]] = param[1]
    else:
        raise Exception(
            "Section {0} not found in the {1} file".format(section, filename)
        )
    return db

main.py

import psycopg2
from config import config


def connect():
    """Connect to the PostgreSQL database server"""
    conn = None
    try:
        params = config()  # read connection parameters
        print("Connecting to the PostgreSQL database...")
        conn = psycopg2.connect(**params)
        cur = conn.cursor()  # create a cursor
        print("PostgreSQL database version:")
        cur.execute("SELECT version()")  # execute a statement
        db_version = cur.fetchone()
        print(db_version)
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()
            print("Database connection closed.")


if __name__ == "__main__":
    connect()

SQL Lite / SQLite

see page SQLite

InfluxDB

see InfluxDB

Internet Access

Send E-Mails

see Python - eMail

Download data using browser UA / REST

import requests

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",
}
resp = requests.get(
    url,
    headers=headers,
    timeout=3,  # timeout in sec, requests should always have a timeout!
    # timeout=(1,3), # 1s connect-timout, 3s read-timeout
)
if resp.status_code != 200:  # noqa: PLR2004
    msg = f"E: bad response. status code:{resp.status_code}, text:\n{resp.text}"
    raise Exception(msg)  # noqa: TRY002

Download only if cache is too old

import time
from pathlib import Path
import requests

def fetch_url_or_cache(file_cache: Path, url) -> str:
    """Fetch URL and cache in a file."""
    if check_cache_file_available_and_recent(
        file_path=file_cache, max_age=3600, verbose=False
    ):
        with file_cache.open(mode="r", encoding="utf-8") as fh:
            s = fh.read()
    else:
        s = fetch(url=url)
        with file_cache.open(mode="w", encoding="utf-8", newline="\n") as fh:
            fh.writelines(s)
    return s

def check_cache_file_available_and_recent(
    file_path: Path,
    max_age: int = 3500,
) -> bool:
    """Check if cache file exists and is recent."""
    cache_good = False
    if file_path.exists() and (time.time() - file_path.stat().st_mtime < max_age):
        cache_good = True
    return cache_good

# or
def check_cache_file_available_and_recent_verbose(
    file_path: Path, max_age: int = 3600, *, verbose: bool = False
) -> bool:
    """Check if cache file exists and is recent."""
    cache_good = True
    if not file_path.exists():
        if verbose:
            print(f"No Cache available: {file_path}")
        cache_good = False
    if cache_good and time.time() - (time.time() - file_path.stat().st_mtime < max_age):
        if verbose:
            print(f"Cache too old: {file_path}")
        cache_good = False
    return cache_good

def fetch(url) -> str:
    """Fetch URL."""
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:108.0) Gecko/20100101 Firefox/108.0",  # noqa: E501
    }
    resp = requests.get(url, headers=headers, timeout=5)
    if resp.status_code == 200:  # noqa: PLR2004
        return resp.content.decode("ascii")
    else:  # noqa: RET505
        msg = f"E: bad response. status code:{resp.status_code}, text:\n{resp.text}"
        raise Exception(msg)  # noqa: TRY002

Download using urllib.request

from urllib.request import urlopen

url = "https://pomber.github.io/covid19/timeseries.json"
with urlopen(url) as response:  # noqa: S310
    response_text = response.read()

HTML parsing and extracting of elements

V2: via BeautifulSoup

from bs4 import BeautifulSoup  # pip install beautifulsoup4

soup = BeautifulSoup(cont, features="html.parser")
my_element = soup.find("div", {"class": "user-formatted-inner"})
my_body = my_element.prettify()
# my_body = my_element.encode()
# my_body = str(my_element)

V1: via lxml and xpath

from lxml import html
import requests

page = requests.get(url)
tree = html.fromstring(page.content)
tbody_trs = tree.xpath("//*/tbody/tr")
l_rows = []
for tr in tbody_trs:
    l_columns = []
    if len(tr) != 15:
        continue
    for td in tr:
        l_columns.append(td.text_content())
        l_rows.append(list(l_columns))

HTML entities to unicode

import html
cont = html.unescape(cont)

GUI Interactions

Take Screenshot

import pyautogui # (c:\Python\Scripts\)pip install pyautogui
# pyautogui does only support screenshots on monitor #1
...
screenshot = pyautogui.screenshot()
# screenshot = pyautogui.screenshot(region=(screenshotX,screenshotY, screenshotW, screenshotH))
screenshot = np.array(screenshot) 
# Convert RGB to BGR 
screenshot = screenshot[:, :, ::-1].copy()

Mouse Actions

def clickIt(x,y,key="") :
  x0, y0 = pyautogui.position()
  if key != "": # crtl, shift
    pyautogui.keyDown(key)
  pyautogui.moveTo(x, y, duration=0.2)
  pyautogui.click(x=x , y=y, button='left', clicks=1, interval=0.1)
  if key != "": # crtl, shift
    pyautogui.keyUp(key)
  pyautogui.moveTo(x0, y0)

Web Automation

from selenium import webdriver
from selenium.webdriver.common.keys import Keys

# from selenium.webdriver import Firefox
from selenium.webdriver.firefox.options import Options

import os
import time
import glob

class StravaUserMapDL():
    def __init__(self):
        self.driver = webdriver.Firefox()

    def login(self):
        driver = self.driver
        url = "https://www.somewebpage.com"
        email = "myemail"
        password = "mypassword"
        driver.get(url)

        title = driver.title
        urlIs = driver.current_url
        cont = driver.page_source #  as string
        FILE = open(filename,"w") # w = overWrite file ; a = append to file
        FILE.write(cont)
        FILE.close()         

        # handle login if urlIs != url
        if (urlIs != url): 
            # activate checkbox 'remember_me'
            elem = driver.find_element_by_id('remember_me')
            if (elem.is_selected() == False):
                elem.click()
            assert elem.is_selected() == True
            elem = driver.find_element_by_id('email')
            elem.send_keys(email)
            elem = driver.find_element_by_id('password')
            elem.send_keys(password)
            elem.send_keys(Keys.RETURN)
            # Wait until login pages is replaced by real page
            urlIs = driver.current_url
            while (urlIs != url):
                time.sleep(1)
                urlIs = driver.current_url
            print (urlIs)

            # results = driver.find_elements_by_class_name('following')
            # results = driver.find_elements_by_tag_name('li')

            # print(results[0].text)
        assert (urlIs == url)

Unit Tests using Web Automation

import unittest
from selenium import webdriver
from selenium.webdriver.common.keys import Keys

#from selenium.webdriver import Firefox
from selenium.webdriver.firefox.options import Options

import os
import time

class PythonOrgSearch(unittest.TestCase):
#    def __init__(self,asdf):
#        self.driver = webdriver.Firefox() 

    def setUp(self):
        print ("setUp")
        # headless mode:
        # opts = Options()
        # opts.set_headless()
        # assert opts.headless  # Operating in headless mode
        # self.driver = webdriver.Firefox(options=opts)

        self.driver = webdriver.Firefox()

    def test_search_in_python_org(self):
        driver = self.driver
        driver.get("http://www.python.org")
        self.assertIn("Python", driver.title)
        elem = driver.find_element_by_name("q")
        elem.send_keys("pycon")
        elem.send_keys(Keys.RETURN)
        assert "No results found." not in driver.page_source
        print ("fertig: python_org")

    def tearDown(self):
        print ("tearDown")
        print ("close Firefox")
        self.driver.close() # close tab
        self.driver.quit() # quit browser
        # os._exit(1) # exit unittest without Exception


if __name__ == "__main__":
    try:
        unittest.main()
    except SystemExit as e:
        os._exit(1)

Cryptography and Hashing

Hashing via SHA256

def gen_SHA256_string(s: str) -> str:
    m = hashlib.sha256()
    m.update(s.encode("ascii"))
    return m.hexdigest()

Hashing via MD5

(MD5 is not secure, better use SHA256)

def gen_MD5_string(s: str) -> str:
    m = hashlib.md5()
    m.update(s.encode("ascii"))
    return m.hexdigest()

Password hashing via bcrypt

import bcrypt
pwd = 'geheim'
pwd = pwd.encode("utf-8")
# or 
pwd = b'geheim'

hashed = bcrypt.hashpw(pwd, bcrypt.gensalt())
if bcrypt.checkpw(pwd, hashed):
    print("It Matches!")
    print(hashed.decode("utf-8"))

To use version 2a instead of 2b (default):

bcrypt.gensalt(prefix=b"2a")

Multiprocessing, subprocesses and Threading

see Python_-_Multithreading as well

use processes for CPU limited work
use threads for I/O limited work

Simple single process

import subprocess
process = subprocess.run(["sudo", "du", "--max-depth=1", mydir], capture_output=True, text=True)
print (process.stdout)

old, depricated way:

os.system( "gnuplot " + gpfile)

Multiprocessing

see Python - Multithreading as well

V2 using pool and starmap

import multiprocessing
import os

def worker(i: int, s: str) -> list:
    result = (i, s, os.getpid())
    return result

if __name__ == "__main__":
    # gen. pile of work
    l_pile_of_work = []
    for i in range(1_000):
        tup = (i, "n" + str(i))
        l_pile_of_work.append((tup))
    # gen pool of processes
    num_processes = min(multiprocessing.cpu_count(), len(l_pile_of_work))
    pool = multiprocessing.Pool(processes=num_processes)
    # start processes on pile of work
    l_results_unsorted = pool.starmap(
        func=worker, iterable=l_pile_of_work  # each item is a list of 2 parameters
    )
    
    # or if only one parameter:
    # l_results_unsorted = pool.map(doit_de_district, l_pile_of_work)
    
    l_results = sorted(l_results_unsorted)  # sort by i

V1

import subprocess
l_subprocesses = []  # queue list of subprocesses
max_processes = 4

def process_enqueue(new_process_parameters):
    global l_subprocesses
    # wait for free slot
    while len(l_subprocesses) >= max_processes:
        process_remove_finished_from_queue()
        time.sleep(0.1)  # sleep 0.1s
    process = subprocess.Popen(new_process_parameters,
                               stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                               universal_newlines=True)
    l_subprocesses.append(process)

def process_remove_finished_from_queue():
    global l_subprocesses
    i = 0
    while i <= len(l_subprocesses) - 1:
        process = l_subprocesses[i]
        if process.poll != None:  # has already finished
            process_print_output(process)
            l_subprocesses.pop(i)
        else:  # still running
            i += 1

def process_print_output(process):
    """waits for process to finish and prints process output"""
    stdout, stderr = process.communicate()
    if stdout != :
        print(f'Out: {stdout}')
    if stderr != :
        print(f'ERROR: {stderr}')

def process_wait_for_all_finished():
    global l_subprocesses
    for process in l_subprocesses:
        process_print_output(process)
    l_subprocesses = []  # empty list of done subprocesses

process_enqueue(l_parameters1)
...
process_enqueue(l_parameters999)
process_wait_for_all_finished()

Threading

import threading
import queue
import os
import time

def worker(q_work: queue.Queue, results: dict):
    while not q_work.empty():
        i, s = q_work.get()
        time.sleep(.1)
        result = (i, s, os.getpid())
        results[i] = result
        q_work.task_done()

if __name__ == '__main__':
    d_results = {}  # threads can write into dict
    # gen. pile of work
    l_pile_of_work = []
    for i in range(1_000):
        tup = (i, "n"+str(i))
        l_pile_of_work.append((tup))
    # convert list of work to queue
    q_pile_of_work = queue.Queue(
        maxsize=len(l_pile_of_work))  # maxsize=0 -> unlimited
    for params in l_pile_of_work:
        q_pile_of_work.put(params)
    # gen threads
    num_threads = 100
    l_threads = []  # List of threads, not used here
    for i in range(num_threads):
        t = threading.Thread(name='myThread-'+str(i),
                             target=worker,
                             args=(q_pile_of_work, d_results),
                             daemon=True)
        l_threads.append(t)
        t.start()
    q_pile_of_work.join()  # wait for all threas to complete
    l_results_unsorted = d_results.values()
    l_results = sorted(l_results_unsorted)  # sort by i


asyncio — Asynchronous I/O

see https://docs.python.org/3/library/asyncio-task.html

import asyncio
import time

# basics
# task = asyncio.create_task(coro())
# Wrap the coro coroutine into a Task and schedule its execution. Return the Task object.

# sleep
# await asyncio.sleep(1)

# Running Tasks Concurrently and gathers the return values in list L
# L = await asyncio.gather(coro(x1,y1), coro(x2,y2), coro(x3,y3))

async def say_after(delay, what):
    # Coroutines declared with the async/await syntax
    await asyncio.sleep(delay)
    print(what)

async def main():
    # The asyncio.create_task() function to run coroutines concurrently as asyncio Tasks.
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(1, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

Pandas

see Pandas

Matplotlib

see Matplotlib

GUI via tkinter

import tkinter as tk  # no need to install via pip


class App(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title("robot-CClicker")
        self.geometry("200x200+0+1080")
        self.resizable(width=False, height=False)

        self.l_buttons = []
        self.__create_widgets()

    def __create_widgets(self):
        self.btn_click500 = tk.Button(
            master=self,
            text="500 clicks",
            width=20,
            command=lambda: self.clickBigCookie(500),
        )
        self.l_buttons.append(self.btn_click500)

        for button in self.l_buttons:
            button.pack(anchor=tk.W)

    def clickBigCookie(self, num):
        # TODO: the disabling of the buttons is not working
        for button in self.l_buttons:
            button["state"] = tk.DISABLED
        helper.clickIt(self.posBigCockie[0], self.posBigCockie[1], num=num)
        for button in self.l_buttons:
            button["state"] = tk.NORMAL


if __name__ == "__main__":
    app = App()
    app.mainloop()

Protobuf

convert .proto file to python class

see https://www.datascienceblog.net/post/programming/essential-protobuf-guide-python/

protoc my_message.proto --python_out ./ 

read/decode protobuf message

parse message from file

import machine_message_pb2

with open("out.bin", "rb") as f:
    my_message = my_message_pb2.my_message()
    my_message.ParseFromString(f.read())
print(machine_message)

create/encode protobuf message

import machine_message_pb2

my_message = my_message_pb2.my_message()
my_message.data.field1 = 1
my_message.data.field2 = "asdf"

with open("out.bin", "wb") as f:
    f.write(my_message.data.SerializeToString())

venv / virtual environments

pip install --upgrade pip
python -m venv .venv --prompt $(basename $(pwd))
source .venv/bin/activate
pip install -r requirements.txt
...
deactivate

Pydantic data validation

Function input parameter validation

see https://docs.pydantic.dev/usage/validation_decorator/

@validate_arguments
def sum(x: int, y: float) -> float:
    print(x, y)
    return x + y
 
print(sum(1.1, 1.1))
# -> 2.1

Read config file

TOML

see https://learnxinyminutes.com/docs/toml/

minimal example: config.toml

# SAP API endpoint
[general]
sleep_time = 60
use_proxy = true

tool.py

try:
    import tomllib  # comes with python3.11
except ModuleNotFoundError:
    import tomli as tomllib  # pip install tomli
with open("config.toml", "rb") as f:
   o = tomllib.load(f)  # type: ignore
if o["settings"]["use_proxy"]:
...

for validation, see https://realpython.com/python-toml/

ConfigParser: INI Config File Reading

better use TOML

Config.ini

[Section1]
Cursor         = 205E18
Grandma        =  18E18
Farm           =  11E18
Mine           = 514E18
Factory        = 155E18

test.py

from configparser import ConfigParser

config = ConfigParser(
    interpolation=None
)  # interpolation=None -> treats % in values as char % instead of interpreting it
config.read("Config.ini", encoding="utf-8")

print(config.getint("Section1", "key1"))
print(config.getfloat("Section1", "key2"))
print(config.get("Section1", "key3"))

for sec in config.sections():
    d_settings = {}
    for key in config.options(sec):
        value = config.get(sec, key)
        d_settings[key] = value
        print("%15s : %s" % (key, value))

VCard / vcf

import codecs
import vobject  # pip install vobject

obj = vobject.readComponents(codecs.open(file_in, encoding="utf-8").read())  # type: ignore
contacts: list[vobject.base.Component] = list(obj)  # type: ignore

card = contacts[0]

if "bday" not in card.contents:
    continue

# bday: remove 'VALUE': ['DATE']
card.contents["bday"][0].params = {}  # type: ignore

# remove all fields but "bday", "n"
for key in card.contents.copy():  # loop over copy, to allow for deleting keys
    if key not in ("n", "bday"):
        del card.contents[key]

# recreate fn based on n
card.add("fn")
n = card.contents["n"][0]
fn = f"{n.value.given} {n.value.additional} {n.value.family}"  # type: ignore
fn = re.sub(r"\s+", " ", fn)
card.fn.value = fn  # type: ignore

with open("out.vcf", mode="w", encoding="utf-8", newline="\n") as fhOut:
    fhOut.write(card.serialize())

Sentry Exception monitoring

see Sentry

MQTT

#!/usr/bin/env python3.12
"""Read power data from Tasmota device via MQTT."""

import json
from typing import Any

import paho.mqtt.client as mqtt
from mqtt_credentials import hostname, password, port, username
from paho.mqtt.reasoncodes import ReasonCode


def on_connect(
    mqtt_client: mqtt.Client,
    userdata: Any,
    flags: dict[str, int],
    reason_code: ReasonCode,
    properties: Any,
) -> None:
    """Callback when the client connects MQTT broker."""  # noqa: D401
    if reason_code == 0:
        print("Connected to MQTT")
        # print(f"MQTT protocol version: {mqtt_client._protocol}")

        # Subscribing in on_connect() means that if we lose the connection and
        # reconnect then subscriptions will be renewed.
        mqtt_client.message_callback_add("tele/tasmota_MT681/SENSOR", on_message)
        mqtt_client.subscribe([("tele/tasmota_MT681/SENSOR", 0)])

    else:
        print(f"Connection to MQTT broker failed. Error: {reason_code}")


def on_disconnect(
    mqtt_client: mqtt.Client,
    userdata: Any,
    reason_code: ReasonCode,
    properties: Any,
) -> None:
    """Callback when the client is disconnected from the MQTT broker."""  # noqa: D401
    if reason_code > 0:
        print(f"Unexpected disconnection. Error: {reason_code}")


def on_message(
    mqtt_client: mqtt.Client,
    userdata: Any,
    message: mqtt.MQTTMessage,
) -> None:
    """Callback when a Tasmota message is received from the MQTT broker."""  # noqa: D401
    s = message.payload.decode()
    data = json.loads(s)
    # {'Time': '2024-03-16T06:44:08', 'MT681': {'Total_in': 16.4396, 'Power_cur': 80, 'Total_out': 14.6403}}  # noqa: E501

    total_i = data["MT681"]["Total_in"]
    total_o = data["MT681"]["Total_out"]

    print(f"In:\t{total_i}\nOut:\t{total_o}")

    mqtt_client.disconnect()
    mqtt_client.loop_stop()


try:
    # Create an MQTT client instance
    mqtt_client: mqtt.Client = mqtt.Client(
        mqtt.CallbackAPIVersion.VERSION2, "Python-Client-1"
    )  # type: ignore
    mqtt_client.username_pw_set(username, password)

    mqtt_client.on_connect = on_connect

    # Connect to the MQTT broker
    mqtt_client.connect(hostname, port, keepalive=60)

    # NOT: while True, as that eats the CPU !!!
    mqtt_client.loop_forever()
except KeyboardInterrupt:
    print("KeyboardInterrupt, disconnecting from MQTT broker.")
    mqtt_client.disconnect()
    mqtt_client.loop_stop()

Caching Functions

from functools import lru_cache

@lru_cache(maxsize=128)  # None equals to @cache declarator
def fnc(n:int):

JWT Token

 try:
     decoded_token = jwt.decode(token, options={"verify_signature": False})
 except jwt.ExpiredSignatureError:
     msg = "Token has expired."
     logger.exception(msg)
 except jwt.InvalidTokenError:
     msg = "Invalid token."
     logger.exception(msg)

Streamlit

Chart via Altair

Line Chart

chart = st.line_chart(df["value"])
# corresponds to
import altair as alt
c = (
   alt.Chart(df)
   .mark_line()
   .encode(
       x=alt.X("created", title=None),
       y=alt.Y("value", title=None),
   )
)
st.altair_chart(c, use_container_width=True)

Bar Chart

chart = (
    alt.Chart(df)
    .mark_bar()
    .encode(
        x=alt.X("yearmonth(date):T", title="Month"),
        y="cnt:Q",
        color="status:N",
        tooltip=["yearmonth(date):T", "status:N", "cnt:Q"],
    )
    # .properties(width=600, height=400)
)
st.altair_chart(chart, use_container_width=True)
:T stands for Temporal. It indicates that the field contains date or time values.
:N stands for Nominal. It indicates that the field contains categorical data, which represents discrete categories or labels.
:Q stands for Quantitative. It indicates that the field contains numerical data that can be measured and compared.

Excel Download

def excel_download_buttons(df: pd.DataFrame, file_name: str = "export.xlsx") -> None:
    """Show prepare data and download buttons."""
    if st.button(label="Click to prepare download"):
        buffer = io.BytesIO()
        with pd.ExcelWriter(buffer, engine="xlsxwriter") as writer:
            df.to_excel(writer, sheet_name="Sheet1", index=False)
            writer.close()

            st.download_button(
                label="Download data as Excel",
                data=buffer,
                file_name=file_name,
                mime="application/vnd.ms-excel",
            )