Initial commit

This commit is contained in:
Thomas Türk 2023-05-04 15:06:55 +02:00
commit aea0d76acb
38 changed files with 4085 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
/dist
/docs/build/
/src/PyAPplus64.egg-info
__pycache__

9
LICENSE Normal file
View File

@ -0,0 +1,9 @@
MIT License
Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

6
MANIFEST.in Normal file
View File

@ -0,0 +1,6 @@
include examples
include docs/*
exclude docs/builddocspdf.sh
include docs/source/*
include src/PyAPplus64/py.typed

38
README.md Normal file
View File

@ -0,0 +1,38 @@
# PyAPplus64
Das Paket `PyAPplus64` enthält eine Sammlung von Python Tools für die Interaktion mit dem ERP-System APplus 6.4.
Es sollte auch für andere APplus Versionen nützlich sein.
Zielgruppe sind APplus-Administratoren und Anpassungs-Entwickler. Die Tools erlauben u.a.
- einfacher Zugriff auf SOAP-Schnittstelle des APP-Servers
+ damit Zugriff auf SysConfig
+ Zugriff auf Tools `nextNumber` für Erzeugung der nächsten Nummer für ein Business-Object
+ ...
- Zugriff auf APplus DB per direktem DB-Zugriff und mittels SOAP
+ automatischer Aufruf von `completeSQL`, um per AppServer SQL-Statements um z.B. Mandanten erweitern zu lassen
+ Tools für einfache Benutzung von `useXML`, d.h. für das Einfügen, Löschen und Ändern von Datensätzen
mit Hilfe des APP-Servers. Genau wie bei Änderungen an Datensätzen über die Web-Oberfläche und im Gegensatz
zum direkten Zugriff über die Datenbank werden dabei evtl. zusätzliche
Checks ausgeführt, bestimmte Felder automatisch gesetzt oder bestimmte Aktionen angestoßen.
- das Duplizieren von Datensätzen
+ zu kopierende Felder aus XML-Definitionen werden ausgewertet
+ Abhängige Objekte können einfach ebenfalls mit-kopiert werden
+ Änderungen wie beispielsweise Nummer des Objektes möglich
+ Unterstützung für Kopieren dyn. Attribute
+ Anlage neuer Objekte mittels APP-Server
+ Datensätze können zwischen Systemen kopiert und auch in Dateien gespeichert werden
+ Beispiel: Kopieren von Artikeln mit Arbeitsplan und Stückliste zwischen Deploy- und Prod-System
- einfaches Erstellen von Excel-Reports aus SQL-Abfragen
+ mittels Pandas und XlsxWriter
+ einfache Wrapper um andere Libraries, spart aber Zeit
- ...
In `PyAPplus64` wurden die Features (vielleicht leicht verallgemeinert)
implementiert, die ich für konkrete Aufgabenstellungen benötigte. Ich gehe davon
aus, dass im Laufe der Zeit weitere Features hinzukommen.
`PyAPplus64` erlaubt den schreibenden Zugriff auf die APplus Datenbank und beliebige
Aufrufe von SOAP-Methoden. Unsachgemäße Nutzung kann Ihre Daten zerstören. Benutzen Sie
`PyAPplus64` daher bitte vorsichtig.

20
docs/Makefile Normal file
View File

@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = source
BUILDDIR = build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

4
docs/builddocs.sh Executable file
View File

@ -0,0 +1,4 @@
#!/bin/sh
sphinx-apidoc -T -f ../src/PyAPplus64 -o source/generated
sphinx-build -a -E -b html source build/html

14
docs/builddocspdf.sh Executable file
View File

@ -0,0 +1,14 @@
#!/bin/sh
##
## Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
##
## This file is part of PyAPplus64.
##
sphinx-apidoc -T -f ../src/PyAPplus64 -o source/generated
sphinx-build -a -E -b latex source build/pdf
cd build/pdf
pdflatex pyapplus64.tex
pdflatex pyapplus64.tex

35
docs/make.bat Normal file
View File

@ -0,0 +1,35 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=source
set BUILDDIR=build
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.https://www.sphinx-doc.org/
exit /b 1
)
if "%1" == "" goto help
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
:end
popd

View File

@ -0,0 +1,40 @@
Abhängigkeiten
==============
pyodbc
------
Für die Datenbankverbindung wird ``pyodbc`` (``python -m pip install pyodbc``) verwendet.
Der passende ODBC Treiber, MS SQL Server 2012 Native Client, wird zusätzlich benötigt.
Dieser kann von Microsoft bezogen werden.
zeep
----
Die Soap-Library ``zeep`` wird benutzt (``python -m pip install zeep``).
PyYaml
------
Die Library ``pyyaml`` wird für Config-Dateien benutzt (``python -m pip install pyyaml``).
Sphinx
------
Diese Dokumentation ist mit Sphinx geschrieben.
``python -m pip install sphinx``. Dokumentation ist im Unterverzeichnis
`docs` zu finden. Sie kann mittels ``make.bat html`` erzeugt werden,
dies ruft intern ``sphinx-build -M html source build`` auf. Die Dokumentation
der Python-API sollte evtl. vorher
mittels ``sphinx-apidoc -T -f ../src/PyAPplus64 -o source/generated`` erzeugt
oder aktualisiert werden. Evtl. können 2 Aufrufe von ``make.bat html`` sinnvoll
sein, falls sich die Struktur der Dokumentation ändert.
Diese Aufrufe werden von ``builddocs.sh`` automatisiert.
Die erzeugte Doku findet sich im Verzeichnis ``build/html``.
Pandas / SqlAlchemy / xlsxwriter
--------------------------------
Sollen Excel-Dateien mit Pandas erzeugt, werden, so muss Pandas, SqlAlchemy und xlsxwriter installiert sein
(`python -m pip install pandas sqlalchemy xlsxwriter`).

View File

@ -0,0 +1,92 @@
typische Anwendungsfälle
========================
einfache Admin-Aufgaben
-----------------------
Selten auftretende Admin-Aufgaben lassen sich gut mittels Python-Scripten automatisieren.
Es ist sehr einfach möglich, auf die DB, aber auch auf SOAP-Schnittstelle zuzugreifen.
Ich habe dies vor allem für Wartungsarbeiten an Anpassungstabellen, die für eigene Erweiterungen
entwickelt wurden, genutzt.
Als triviales Beispiel sucht folgender Code alle `DOCUMENTS` Einträge in
Artikeln (angezeigt als `Bild` in `ArtikelRec.aspx`), für die Datei, auf die
verwiesen wird, nicht im Dateisystem existiert. Diese fehlenden Dateien werden
ausgegeben und das Feld `DOCUMENTS` gelöscht. Das Löschen erfolgt dabei über
`useXML`, so dass die Felder `UPDDATE` und `UPDUSER` korrekt gesetzt werden.
.. literalinclude:: ../../examples/check_dokumente.py
:language: python
:linenos:
Man kann alle Python Bibliotheken nutzen. Als Erweiterung wäre es in obigem Script
zum Beispiel einfach möglich, alle BMP-Bilder zu suchen, nach PNG zu konvertieren
und den DB-Eintrag anzupassen.
Ad-hoc Reports
--------------
APplus erlaubt es mittels Jasper-Reports, flexible und schöne Reports zu erzeugen.
Dies funktioniert gut und ist für regelmäßig benutzte Reports sehr sinnvoll.
Für ad-hoc Reports, die nur sehr selten oder sogar nur einmal benutzt werden, ist die
Erstellung eines Jasper-Reports und die Einbindung in APplus jedoch zu zeitaufwändig.
Teilweise genügen die Ergebnisse einer SQL-Abfrage, die direkt im MS SQL Server abgesetzt
werden kann. Wird es etwas komplizierter oder sollen die Ergebnisse noch etwas verarbeitet
werden, bietet sich evtl. Python an.
Folgendes Script erzeugt zum Beispiel eine Excel-Tabelle, mit einer Übersicht,
welche Materialen wie oft für Artikel benutzt werden:
.. literalinclude:: ../../examples/adhoc_report.py
:language: python
:linenos:
Dieses kurze Script nutzt Standard-Pandas Methoden zur Erzeugung der Excel-Datei. Allerdings
sind diese Methoden in den Aufrufen von `pandasReadSql` und `exportToExcel` gekapselt,
so dass der Aufruf sehr einfach ist. Zudem ist es sehr einfach, die Verbindung zur Datenbank
und zum APP-Server mittels der YAML-Konfigdatei herzustellen. Bei diesem
Aufruf kann optional ein Nutzer und eine Umgebung übergeben werden, die die Standard-Werte aus
der YAML-Datei überschreiben. `pandasReadSql` nutzt intern `completeSQL`, so dass
der für die Umgebung korrekte Mandant automatisch verwendet wird.
Anbindung eigener Tools
-----------------------
Ursprünglich wurde `PyAPplus64` für die Anbindung einer APplus-Anpassung geschrieben. Dieses ist
als Windows-Service auf einem eigenen Rechner installiert und überwacht dort ein bestimmtes Verzeichnis.
Bei Änderungen an Dateien in diesem Verzeichnis (Hinzufügen, Ändern, Löschen) werden die Dateien verarbeitet
und die Ergebnisse an APplus gemeldet. Dafür werden DB-Operationen aber auch SOAP-Calls benutzt.
Ebenso wird auf die SysConf zugegriffen.
Viele solcher Anpassungen lassen sich gut und sinnvoll im App-Server einrichten und als Job regelmäßig aufrufen.
Im konkreten Fall wird jedoch für die Verarbeitung der Dateien viel Rechenzeit benötigt. Dies würde dadurch
verschlimmert, dass die Dateien nicht auf der gleichen Maschine wie der App-Server liegen und somit
viele, relativ langsame Dateizugriffe über das Netzwerk nötig wären. Hinzu kommt, dass die
Verarbeitung der Dateien dank existierender Bibliotheken in Python einfacher ist.
`PyAPplus64` kann für solche Anpassungen eine interessante Alternative zur Implementierung im App-Server
oder zur Entwicklung eines Tools ohne spezielle Bibliotheken sein. Nach Initialisierung des Servers::
import PyAPplus64
server = PyAPplus64.applus.applusFromConfigFile("my-applus-server.yaml")
bietet `P2APplus64` wie oben demonstriert einfachen lesenden und schreibenden Zugriff auf die DB. Hierbei
werden automatisch die für die Umgebung nötigen Mandanten zu den SQL Statements hinzugefügt. Zudem ist einfach ein
Zugriff auf die Sysconf möglich::
print (server.sysconf.getString("STAMM", "MYLAND"))
print (server.sysconf.getList("STAMM", "EULAENDER"))
Dank der Bibliothek `zeep` ist es auch sehr einfach möglich, auf beliebige SOAP-Methoden zuzugreifen.
Beispielsweise kann auf die Sys-Config auch händisch, d.h. durch direkten Aufruf einer SOAP-Methode,
zugegriffen werden::
client = server.server_conn.getClient("p2system", "SysConf");
print (client.service.getString("STAMM", "MYLAND"))

View File

@ -0,0 +1,42 @@
Beschreibung
============
Das Paket `PyAPplus64` enthält eine Sammlung von Python Tools für die Interaktion mit dem ERP-System APplus 6.4.
Es sollte auch für andere APplus Versionen nützlich sein.
Zielgruppe sind APplus-Administratoren und Anpassungs-Entwickler. `PyAPplus64` erlaubt u.a.
- einfacher Zugriff auf SOAP-Schnittstelle des APP-Servers
+ damit Zugriff auf SysConfig
+ Zugriff auf Tools wie z.B. `nextNumber` für Erzeugung der nächsten Nummer für ein Business-Object
+ ...
- Zugriff auf APplus DB per direktem DB-Zugriff und mittels SOAP
+ automatischer Aufruf von `completeSQL`, um per AppServer SQL-Statements um z.B. Mandanten erweitern zu lassen
+ Tools für einfache Benutzung von `useXML`, d.h. für das Einfügen, Löschen und Ändern von Datensätzen
mit Hilfe des APP-Servers. Genau wie bei Änderungen an Datensätzen über die Web-Oberfläche und im Gegensatz
zum direkten Zugriff über die Datenbank werden dabei evtl. zusätzliche
Checks ausgeführt, bestimmte Felder automatisch gesetzt oder bestimmte Aktionen angestoßen.
- das Duplizieren von Datensätzen
+ zu kopierende Felder aus XML-Definitionen werden ausgewertet
+ Abhängige Objekte können einfach ebenfalls mit-kopiert werden
+ Änderungen wie beispielsweise Nummer des Objektes möglich
+ Unterstützung für Kopieren dyn. Attribute
+ Anlage neuer Objekte mittels APP-Server
+ Datensätze können zwischen Systemen kopiert und auch in Dateien gespeichert werden
+ Beispiel: Kopieren von Artikeln mit Arbeitsplan und Stückliste zwischen Deploy- und Prod-System
- einfaches Erstellen von Excel-Reports aus SQL-Abfragen
+ mittels Pandas und XlsxWriter
+ einfache Wrapper um andere Libraries, spart aber Zeit
- ...
In `PyAPplus64` wurden die Features (vielleicht leicht verallgemeinert)
implementiert, die ich für konkrete Aufgabenstellungen benötigte. Ich gehe davon
aus, dass im Laufe der Zeit weitere Features hinzukommen.
Warnung
-------
`PyAPplus64` erlaubt den schreibenden Zugriff auf die APplus Datenbank und beliebige
Aufrufe von SOAP-Methoden. Unsachgemäße Nutzung kann Ihre Daten zerstören. Benutzen Sie
`PyAPplus64` daher bitte vorsichtig. Zudem kann ich leider nicht garantieren, dass `PyAPplus64` fehlerfrei ist.

56
docs/source/conf.py Normal file
View File

@ -0,0 +1,56 @@
import pathlib
import sys
sys.path.append('../src/')
# Configuration file for the Sphinx documentation builder.
#
# For the full list of built-in configuration values, see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html
# -- Project information -----------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information
project = 'PyAPplus64'
copyright = '2023, Thomas Tuerk'
author = 'Thomas Tuerk'
# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
extensions = [
'sphinx.ext.duration',
'sphinx.ext.doctest',
'sphinx.ext.autodoc',
'sphinx.ext.autosummary',
]
templates_path = ['_templates']
exclude_patterns = [] # type: ignore
language = 'de'
# -- Options for HTML output -------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output
html_theme = 'nature'
# html_static_path = ['_static']
source_suffix = {
'.rst': 'restructuredtext',
}
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
'papersize': 'a4paper',
# The font size ('10pt', '11pt' or '12pt').
# 'pointsize': '11pt',
# Additional stuff for the LaTeX preamble.
# 'preamble': PREAMBLE
}
autodoc_type_aliases = {
'SqlValue': 'SqlValue'
}

73
docs/source/examples.rst Normal file
View File

@ -0,0 +1,73 @@
Beispiele
=========
Im Verzeichnis ``examples`` finden sich Python Dateien, die die Verwendung von `PyAPplus64` demonstrieren.
Config-Dateien
--------------
Viele Scripte teilen sich Einstellungen. Beispielsweise greifen fast alle Scripte irgendwie auf APplus zu und benötigen Informationen,
mit welchem APP-Server, welchem Web-Server und welcher Datenbank sie sich verbinden sollen. Solche Informationen, insbesondere die Passwörter, werden nicht in
jedem Script gespeichert, sondern nur in den Config-Dateien. Es bietet sich wohl meist an, 3 Konfigdateien zu erstellen, je eine für
das Deploy-, das Test- und das Prod-System. Ein Beispiel ist im Unterverzeichnis ``examples/applus-server.yaml`` zu finden.
.. literalinclude:: ../../examples/applus-server.yaml
:language: yaml
:linenos:
Damit nicht in jedem Script immer wieder neu die Konfig-Dateien ausgewählt werden müssen, werden die Konfigs für
das Prod-, Test- und Deploy-System in ``examples/applus_configs.py`` hinterlegt. Diese wird in allen Scripten importiert,
so dass das Config-Verzeichnis und die darin enthaltenen Configs einfach zur Verfügung stehen.
.. literalinclude:: ../../examples/applus_configs.py
:language: python
:linenos:
``check_dokumente.py``
-----------------------
Einfaches Beispiel für lesenden und schreibenden Zugriff auf APplus Datenbank.
.. literalinclude:: ../../examples/check_dokumente.py
:language: python
:lines: 6-
:linenos:
``adhoc_report.py``
-------------------
Sehr einfaches Beispiel zur Erstellung einer Excel-Tabelle aus einer SQL-Abfrage.
.. literalinclude:: ../../examples/adhoc_report.py
:language: python
:lines: 7-
:linenos:
``mengenabweichung.py``
-----------------------
Etwas komplizierteres Beispiel zur Erstellung einer Excel-Datei aus SQL-Abfragen.
.. literalinclude:: ../../examples/mengenabweichung.py
:language: python
:lines: 9-
:linenos:
``mengenabweichung_gui.py``
---------------------------
Beispiel für eine sehr einfache GUI, die die Eingabe einfacher Parameter erlaubt.
Die GUI wird um die Erzeugung von Excel-Dateien mit Mengenabweichungen gebaut.
.. literalinclude:: ../../examples/mengenabweichung_gui.pyw
:language: python
:lines: 7-
:linenos:
``copy_artikel.py``
-----------------------
Beispiel, wie Artikel inklusive Arbeitsplan und Stückliste dupliziert werden kann.
.. literalinclude:: ../../examples/copy_artikel.py
:language: python
:lines: 21-
:linenos:

View File

@ -0,0 +1,93 @@
PyAPplus64 package
==================
Submodules
----------
PyAPplus64.applus module
------------------------
.. automodule:: PyAPplus64.applus
:members:
:undoc-members:
:show-inheritance:
PyAPplus64.applus\_db module
----------------------------
.. automodule:: PyAPplus64.applus_db
:members:
:undoc-members:
:show-inheritance:
PyAPplus64.applus\_scripttool module
------------------------------------
.. automodule:: PyAPplus64.applus_scripttool
:members:
:undoc-members:
:show-inheritance:
PyAPplus64.applus\_server module
--------------------------------
.. automodule:: PyAPplus64.applus_server
:members:
:undoc-members:
:show-inheritance:
PyAPplus64.applus\_sysconf module
---------------------------------
.. automodule:: PyAPplus64.applus_sysconf
:members:
:undoc-members:
:show-inheritance:
PyAPplus64.applus\_usexml module
--------------------------------
.. automodule:: PyAPplus64.applus_usexml
:members:
:undoc-members:
:show-inheritance:
PyAPplus64.duplicate module
---------------------------
.. automodule:: PyAPplus64.duplicate
:members:
:undoc-members:
:show-inheritance:
PyAPplus64.pandas module
------------------------
.. automodule:: PyAPplus64.pandas
:members:
:undoc-members:
:show-inheritance:
PyAPplus64.sql\_utils module
----------------------------
.. automodule:: PyAPplus64.sql_utils
:members:
:undoc-members:
:show-inheritance:
PyAPplus64.utils module
-----------------------
.. automodule:: PyAPplus64.utils
:members:
:undoc-members:
:show-inheritance:
Module contents
---------------
.. automodule:: PyAPplus64
:members:
:undoc-members:
:show-inheritance:

11
docs/source/index.rst Normal file
View File

@ -0,0 +1,11 @@
PyAPplus64 Dokumentation
########################
.. toctree::
:maxdepth: 1
beschreibung
anwendungen
abhaengigkeiten.rst
examples.rst
generated/PyAPplus64

36
examples/adhoc_report.py Normal file
View File

@ -0,0 +1,36 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
import PyAPplus64
import applus_configs
import pathlib
def main(confFile : pathlib.Path, outfile : str) -> None:
server = PyAPplus64.applus.applusFromConfigFile(confFile)
# Einfache SQL-Anfrage
sql1 = ("select Material, count(*) as Anzahl from ARTIKEL "
"group by MATERIAL having MATERIAL is not null "
"order by Anzahl desc")
df1 = PyAPplus64.pandas.pandasReadSql(server, sql1)
# Sql Select-Statements können auch über SqlStatementSelect zusammengebaut
# werden. Die ist bei vielen, komplizierten Bedingungen teilweise hilfreich.
sql2 = PyAPplus64.SqlStatementSelect("ARTIKEL")
sql2.addFields("Material", "count(*) as Anzahl")
sql2.addGroupBy("MATERIAL")
sql2.having.addConditionFieldIsNotNull("MATERIAL")
sql2.order = "Anzahl desc"
df2 = PyAPplus64.pandas.pandasReadSql(server, sql2)
# Ausgabe als Excel mit 2 Blättern
PyAPplus64.pandas.exportToExcel(outfile, [(df1, "Materialien"), (df2, "Materialien 2")], addTable=True)
if __name__ == "__main__":
main(applus_configs.serverConfYamlTest, "myout.xlsx")

View File

@ -0,0 +1,26 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
# Einstellung für die Verbindung mit dem APP-, Web- und DB-Server.
# Viele der Einstellungen sind im APplus Manager zu finden
appserver : {
server : "some-server",
port : 2037,
user : "asol.projects",
env : "default-umgebung" # hier wirklich Umgebung, nicht Mandant verwenden
}
webserver : {
baseurl : "http://some-server/APplusProd6/"
}
dbserver : {
server : "some-server",
db : "APplusProd6",
user : "SA",
password : "your-db-password"
}

View File

@ -0,0 +1,17 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
import pathlib
basedir = pathlib.Path(__file__)
configdir = basedir.joinpath("config")
serverConfYamlDeploy = configdir.joinpath("applus-server-deploy.yaml")
serverConfYamlTest = configdir.joinpath("applus-server-test.yaml")
serverConfYamlProd = configdir.joinpath("applus-server-prod.yaml")

View File

@ -0,0 +1,31 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
import pathlib
import PyAPplus64
import applus_configs
def main(confFile : pathlib.Path, docDir:str, updateDB:bool) -> None:
server = PyAPplus64.applus.applusFromConfigFile(confFile)
sql = PyAPplus64.sql_utils.SqlStatementSelect("ARTIKEL");
sql.addFields("ID", "ARTIKEL", "DOCUMENTS");
sql.where.addConditionFieldStringNotEmpty("DOCUMENTS");
for row in server.dbQueryAll(sql):
doc = pathlib.Path(docDir + row.DOCUMENTS);
if not doc.exists():
print("Bild '{}' für Artikel '{}' nicht gefunden".format(doc, row.ARTIKEL))
if updateDB:
upd = server.mkUseXMLRowUpdate("ARTIKEL", row.ID);
upd.addField("DOCUMENTS", None);
upd.update();
if __name__ == "__main__":
main(applus_configs.serverConfYamlTest, "somedir\\WebServer\\DocLib", False)

59
examples/copy_artikel.py Normal file
View File

@ -0,0 +1,59 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
# Dieses Script demonstriert, wie mit Hilfe PyAPplus64.duplicate
# BusinessObjekte dupliziert werden können.
# Dies ist sowohl in der gleichen DB als auch in anderen DBs möglich.
# So kann z.B. ein einzelner Artikel aus Test in Prod kopiert werden.
# Ebenso ist es möglich, die Daten in einer Datei zwischenzuspeichern und
# später irgendwo anders einzuspielen.
#
# Dies ist für Administrationszwecke gedacht. Anwendungsbeispiel wäre,
# dass ein Artikel mit langem Arbeitsplan und Stückliste im Test-System erstellt wird.
# Viele der Positionen enthalten Nachauflöse-Scripte, die im Test-System
# getestet werden. Diese vielen Scripte per Hand zu kopieren ist aufwändig
# und Fehleranfällig und kann mit solchen Admin-Scripten automatisiert werden.
import pathlib
import PyAPplus64
import applus_configs
import logging
import yaml
def main(confFile:pathlib.Path, artikel:str, artikelNeu:str|None=None) -> None:
# Server verbinden
server = PyAPplus64.applus.applusFromConfigFile(confFile)
# DuplicateBusinessObject für Artikel erstellen
dArt = PyAPplus64.duplicate.loadDBDuplicateArtikel(server, artikel);
# DuplicateBusinessObject zur Demonstration in YAML konvertieren und zurück
dArtYaml = yaml.dump(dArt);
print(dArtYaml);
dArt2 = yaml.load(dArtYaml, Loader=yaml.UnsafeLoader)
# Neue Artikel-Nummer bestimmen und DuplicateBusinessObject in DB schreiben
# Man könnte hier genauso gut einen anderen Server verwenden
if (artikelNeu is None):
artikelNeu = server.nextNumber("Artikel")
if not (dArt is None):
dArt.setFields({"artikel" : artikelNeu})
res = dArt.insert(server);
print(res);
if __name__ == "__main__":
# Logger Einrichten
logging.basicConfig(level=logging.INFO)
# logger = logging.getLogger("PyAPplus64.applus_db");
# logger.setLevel(logging.ERROR)
main(applus_configs.serverConfYamlTest, "my-artikel", artikelNeu="my-artikel-copy")

View File

@ -0,0 +1,182 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
# Erzeugt Excel-Tabellen mit Werkstattaufträgen und Werkstattauftragspositionen mit Mengenabweichungen
import datetime
import PyAPplus64
import applus_configs
import pandas as pd # type: ignore
import pathlib
from typing import *
def ladeAlleWerkstattauftragMengenabweichungen(
server:PyAPplus64.APplusServer,
cond:PyAPplus64.SqlCondition|str|None=None) -> pd.DataFrame:
sql = PyAPplus64.sql_utils.SqlStatementSelect("WAUFTRAG w");
sql.addLeftJoin("personal p", "w.UPDUSER = p.PERSONAL")
sql.addFieldsTable("w", "ID", "BAUFTRAG", "POSITION")
sql.addFields("(w.MENGE-w.MENGE_IST) as MENGENABWEICHUNG")
sql.addFieldsTable("w", "MENGE", "MENGE_IST",
"APLAN as ARTIKEL", "NAME as ARTIKELNAME")
sql.addFields("w.UPDDATE", "p.NAME as UPDNAME")
sql.where.addConditionFieldGe("w.STATUS", 5)
sql.where.addCondition("abs(w.MENGE-w.MENGE_IST) > 0.001")
sql.where.addCondition(cond)
sql.order="w.UPDDATE"
dfOrg = PyAPplus64.pandas.pandasReadSql(server, sql);
# Add Links
df = dfOrg.copy();
df = df.drop(columns=["ID"]);
# df = df[['POSITION', 'BAUFTRAG', 'MENGE']] # reorder / filter columns
df['POSITION'] = PyAPplus64.pandas.mkHyperlinkDataframeColumn(dfOrg,
lambda r: r.POSITION,
lambda r: server.makeWebLinkWauftrag(
bauftrag=r.BAUFTRAG, accessid=r.ID))
df['BAUFTRAG'] = PyAPplus64.pandas.mkHyperlinkDataframeColumn(dfOrg,
lambda r: r.BAUFTRAG,
lambda r: server.makeWebLinkBauftrag(bauftrag=r.BAUFTRAG))
colNames = {
"BAUFTRAG" : "Betriebsauftrag",
"POSITION" : "Pos",
"MENGENABWEICHUNG" : "Mengenabweichung",
"MENGE" : "Menge",
"MENGE_IST" : "Menge-Ist",
"ARTIKEL" : "Artikel",
"ARTIKELNAME" : "Artikel-Name",
"UPDDATE" : "geändert am",
"UPDNAME" : "geändert von"
}
df.rename(columns=colNames, inplace=True);
return df
def ladeAlleWerkstattauftragPosMengenabweichungen(
server : PyAPplus64.APplusServer,
cond:PyAPplus64.SqlCondition|str|None=None) -> pd.DataFrame:
sql = PyAPplus64.sql_utils.SqlStatementSelect("WAUFTRAGPOS w");
sql.addLeftJoin("personal p", "w.UPDUSER = p.PERSONAL")
sql.addFieldsTable("w", "ID", "BAUFTRAG", "POSITION", "AG")
sql.addFields("(w.MENGE-w.MENGE_IST) as MENGENABWEICHUNG")
sql.addFieldsTable("w", "MENGE", "MENGE_IST", "APLAN as ARTIKEL")
sql.addFields("w.UPDDATE", "p.NAME as UPDNAME")
sql.where.addConditionFieldEq("w.STATUS", 4)
sql.where.addCondition("abs(w.MENGE-w.MENGE_IST) > 0.001")
sql.where.addCondition(cond)
sql.order="w.UPDDATE"
dfOrg = PyAPplus64.pandas.pandasReadSql(server, sql);
# Add Links
df = dfOrg.copy();
df = df.drop(columns=["ID"]);
df['POSITION'] = PyAPplus64.pandas.mkHyperlinkDataframeColumn(dfOrg,
lambda r: r.POSITION,
lambda r: server.makeWebLinkWauftrag(
bauftrag=r.BAUFTRAG, accessid=r.ID))
df['BAUFTRAG'] = PyAPplus64.pandas.mkHyperlinkDataframeColumn(dfOrg,
lambda r: r.BAUFTRAG,
lambda r: server.makeWebLinkBauftrag(bauftrag=r.BAUFTRAG))
df['AG'] = PyAPplus64.pandas.mkHyperlinkDataframeColumn(dfOrg,
lambda r: r.AG,
lambda r: server.makeWebLinkWauftragPos(
bauftrag=r.BAUFTRAG, position=r.POSITION, accessid=r.ID))
# Demo zum Hinzufügen einer berechneten Spalte
# df['BAUFPOSAG'] = PyAPplus64.pandas.mkDataframeColumn(dfOrg,
# lambda r: "{}.{} AG {}".format(r.BAUFTRAG, r.POSITION, r.AG))
# Rename Columns
colNames = {
"BAUFTRAG" : "Betriebsauftrag",
"POSITION" : "Pos",
"AG" : "AG",
"MENGENABWEICHUNG" : "Mengenabweichung",
"MENGE" : "Menge",
"MENGE_IST" : "Menge-Ist",
"ARTIKEL" : "Artikel",
"UPDDATE" : "geändert am",
"UPDNAME" : "geändert von"
}
df.rename(columns=colNames, inplace=True);
return df
def computeInYearMonthCond(field : str, year:int|None=None,
month:int|None=None) -> PyAPplus64.SqlCondition | None:
if not (year is None):
if month is None:
return PyAPplus64.sql_utils.SqlConditionDateTimeFieldInYear(field, year)
else:
return PyAPplus64.sql_utils.SqlConditionDateTimeFieldInMonth(field, year, month)
else:
return None
def computeFileName(year:int|None=None, month:int|None=None) -> str:
if year is None:
return 'mengenabweichungen-all.xlsx';
else:
if month is None:
return 'mengenabweichungen-{:04d}.xlsx'.format(year);
else:
return 'mengenabweichungen-{:04d}-{:02d}.xlsx'.format(year, month);
def _exportInternal(server: PyAPplus64.APplusServer, fn:str,
cond:Union[PyAPplus64.SqlCondition, str, None]) -> int:
df1 = ladeAlleWerkstattauftragMengenabweichungen(server, cond)
df2 = ladeAlleWerkstattauftragPosMengenabweichungen(server, cond)
print ("erzeuge " + fn);
PyAPplus64.pandas.exportToExcel(fn, [(df1, "WAuftrag"), (df2, "WAuftrag-Pos")], addTable=True)
return len(df1.index) + len(df2.index)
def exportVonBis(server: PyAPplus64.APplusServer, fn:str,
von:datetime.datetime|None, bis:datetime.datetime|None) -> int:
cond = PyAPplus64.sql_utils.SqlConditionDateTimeFieldInRange("w.UPDDATE", von, bis)
return _exportInternal(server, fn, cond)
def exportYearMonth(server: PyAPplus64.APplusServer,
year:int|None=None, month:int|None=None) -> int:
cond=computeInYearMonthCond("w.UPDDATE", year=year, month=month)
fn = computeFileName(year=year, month=month)
return _exportInternal(server, fn, cond)
def computePreviousMonthYear(cyear : int, cmonth :int) -> Tuple[int, int]:
if cmonth == 1:
return (cyear-1, 12)
else:
return (cyear, cmonth-1);
def computeNextMonthYear(cyear : int, cmonth :int) -> Tuple[int, int]:
if cmonth == 12:
return (cyear+1, 1)
else:
return (cyear, cmonth+1);
def main(confFile : str|pathlib.Path, user:str|None=None, env:str|None=None) -> None:
server = PyAPplus64.applusFromConfigFile(confFile, user=user, env=env)
now = datetime.date.today()
(cmonth, cyear) = (now.month, now.year)
(pyear, pmonth) = computePreviousMonthYear(cyear, cmonth);
# Ausgaben
exportYearMonth(server, cyear, cmonth) # Aktueller Monat
exportYearMonth(server, pyear, pmonth) # Vorheriger Monat
# export(cyear) # aktuelles Jahr
# export(cyear-1) # letztes Jahr
# export() # alles
if __name__ == "__main__":
main(applus_configs.serverConfYamlTest)

View File

@ -0,0 +1,98 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
import PySimpleGUI as sg # type: ignore
import mengenabweichung
import datetime
import PyAPplus64
import applus_configs
import pathlib
from typing import *
def parseDate (dateS:str) -> Tuple[datetime.datetime|None, bool]:
if dateS is None or dateS == '':
return (None, True)
else:
try:
return (datetime.datetime.strptime(dateS, '%d.%m.%Y'), True)
except:
sg.popup_error("Fehler beim Parsen des Datums '{}'".format(dateS))
return (None, False)
def createFile(server:PyAPplus64.APplusServer, fileS:str, vonS:str, bisS:str)->None:
(von, vonOK) = parseDate(vonS)
if not vonOK: return
(bis, bisOK) = parseDate(bisS)
if not bisOK: return
if (fileS is None) or fileS == '':
sg.popup_error("Es wurde keine Ausgabedatei ausgewählt.")
return
else:
file = pathlib.Path(fileS)
c = mengenabweichung.exportVonBis(server, file.as_posix(), von, bis)
sg.popup_ok("{} Datensätze erfolgreich in Datei '{}' geschrieben.".format(c, file))
def main(confFile : str|pathlib.Path, user:str|None=None, env:str|None=None) -> None:
server = PyAPplus64.applusFromConfigFile(confFile, user=user, env=env)
layout = [
[sg.Text(('Bitte geben Sie an, für welchen Zeitraum die '
'Mengenabweichungen ausgegeben werden sollen:'))],
[sg.Text('Von (einschließlich)', size=(15,1)), sg.InputText(key='Von'),
sg.CalendarButton("Kalender", close_when_date_chosen=True,
target="Von", format='%d.%m.%Y')],
[sg.Text('Bis (ausschließlich)', size=(15,1)), sg.InputText(key='Bis'),
sg.CalendarButton("Kalender", close_when_date_chosen=True,
target="Bis", format='%d.%m.%Y')],
[sg.Text('Ausgabedatei', size=(15,1)), sg.InputText(key='File'),
sg.FileSaveAs(button_text="wählen", target="File",
file_types = (('Excel Files', '*.xlsx'),),
default_extension = ".xlsx")],
[sg.Button("Aktueller Monat"), sg.Button("Letzter Monat"),
sg.Button("Aktuelles Jahr"), sg.Button("Letztes Jahr")],
[sg.Button("Speichern"), sg.Button("Beenden")]
]
systemName = server.scripttool.getSystemName() + "/" + server.scripttool.getMandant()
window = sg.Window("Mengenabweichung " + systemName, layout)
now = datetime.date.today()
(cmonth, cyear) = (now.month, now.year)
(pyear, pmonth) = mengenabweichung.computePreviousMonthYear(cyear, cmonth);
(nyear, nmonth) = mengenabweichung.computeNextMonthYear(cyear, cmonth);
while True:
event, values = window.read()
if event == sg.WIN_CLOSED or event == 'Beenden':
break
if event == 'Aktueller Monat':
window['Von'].update(value="01.{:02d}.{:04d}".format(cmonth, cyear));
window['Bis'].update(value="01.{:02d}.{:04d}".format(nmonth, nyear));
if event == 'Letzter Monat':
window['Von'].update(value="01.{:02d}.{:04d}".format(pmonth, pyear));
window['Bis'].update(value="01.{:02d}.{:04d}".format(cmonth, cyear));
if event == 'Aktuelles Jahr':
window['Von'].update(value="01.01.{:04d}".format(cyear));
window['Bis'].update(value="01.01.{:04d}".format(cyear+1));
if event == 'Letztes Jahr':
window['Von'].update(value="01.01.{:04d}".format(cyear-1));
window['Bis'].update(value="01.01.{:04d}".format(cyear));
if event == 'Speichern':
try:
createFile(server, values.get('File', None),
values.get('Von', None), values.get('Bis', None))
except Exception as e:
sg.popup_error_with_traceback("Beim Erzeugen der Excel-Datei trat ein Fehler auf:", e);
window.close()
if __name__ == "__main__":
main(applus_configs.serverConfYamlProd)

4
mypy.ini Normal file
View File

@ -0,0 +1,4 @@
[mypy]
disallow_incomplete_defs = True
disallow_untyped_defs = True

33
pyproject.toml Normal file
View File

@ -0,0 +1,33 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "PyAPplus64"
version = "1.0"
authors = [
{ name="Thomas Tuerk", email="kontakt@thomas-tuerk.de" },
]
description = "Verschiedene Hilfsmittel, um mit dem ERP System APplus zu interagieren. Dieses Packet wurde für APplus 6.4 entwickelt, funktioniert vermutlich aber auch mit anderen Versionen."
readme = "README.md"
requires-python = ">=3.6"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Development Status :: 3 - Alpha",
"Intended Audience :: Other Audience",
"Operating System :: Microsoft :: Windows"
]
dependencies = [
'pyodbc',
'PyYAML',
'SQLAlchemy',
'pandas',
'XlsxWriter',
'zeep'
]
[project.urls]
homepage = "https://www.thomas-tuerk.de/de/pyapplus64"
repository = "https://git.thomas-tuerk.de/thtuerk/PyAPplus64"
documentation = "https://www.thomas-tuerk.de/assets/PyAPplus64/html/index.html"

View File

@ -0,0 +1,30 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
from . import utils
from . import applus_db
from . import applus_scripttool
from . import applus_server
from . import applus_sysconf
from . import applus_usexml
from . import applus
from . import sql_utils
from . import duplicate
from . import utils
from .applus import APplusServer, applusFromConfigFile
from .sql_utils import (
SqlCondition,
SqlStatement,
SqlStatementSelect
)
try:
from . import pandas
except:
pass

287
src/PyAPplus64/applus.py Normal file
View File

@ -0,0 +1,287 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#-*- coding: utf-8 -*-
from . import applus_db
from . import applus_server
from . import applus_sysconf
from . import applus_scripttool
from . import applus_usexml
from . import sql_utils
import yaml
import urllib.parse
from zeep import Client
import pyodbc # type: ignore
from typing import *
if TYPE_CHECKING:
from _typeshed import FileDescriptorOrPath
class APplusServer:
"""
Verbindung zu einem APplus DB und App Server mit Hilfsfunktionen für den komfortablen Zugriff.
:param db_settings: die Einstellungen für die Verbindung mit der Datenbank
:type db_settings: APplusDBSettings
:param server_settings: die Einstellungen für die Verbindung mit dem APplus App Server
:type server_settings: APplusAppServerSettings
:param web_settings: die Einstellungen für die Verbindung mit dem APplus Web Server
:type web_settings: APplusWebServerSettings
"""
def __init__(self, db_settings : applus_db.APplusDBSettings, server_settings : applus_server.APplusAppServerSettings, web_settings : applus_server.APplusWebServerSettings):
self.db_settings : applus_db.APplusDBSettings = db_settings
"""Die Einstellungen für die Datenbankverbindung"""
self.web_settings : applus_server.APplusWebServerSettings = web_settings
"""Die Einstellungen für die Datenbankverbindung"""
self.db_conn = db_settings.connect()
"""
Eine pyodbc-Connection zur APplus DB. Diese muss genutzt werden, wenn mehrere Operationen in einer Transaktion
genutzt werden sollen. Ansonsten sind die Hilfsmethoden wie :meth:`APplusServer.dbQuery` zu bevorzugen.
Diese Connection kann in Verbindung mit den Funktionen aus :mod:`PyAPplus64.applus_db` genutzt werden.
"""
self.server_conn : applus_server.APplusServerConnection = applus_server.APplusServerConnection(server_settings);
"""erlaubt den Zugriff auf den AppServer"""
self.sysconf : applus_sysconf.APplusSysConf = applus_sysconf.APplusSysConf(self);
"""erlaubt den Zugriff auf die Sysconfig"""
self.scripttool : applus_scripttool.APplusScriptTool = applus_scripttool.APplusScriptTool(self);
"""erlaubt den einfachen Zugriff auf Funktionen des ScriptTools"""
self.client_table = self.server_conn.getClient("p2core","Table");
self.client_xml = self.server_conn.getClient("p2core","XML");
self.client_nummer = self.server_conn.getClient("p2system", "Nummer")
def reconnectDB(self) -> None:
try:
self.db_conn.close()
except:
pass
self.db_conn = self.db_settings.connect()
def completeSQL(self, sql : sql_utils.SqlStatement, raw:bool=False) -> str:
"""
Vervollständigt das SQL-Statement. Es wird z.B. der Mandant hinzugefügt.
:param sql: das SQL Statement
:type sql: sql_utils.SqlStatement
:param raw: soll completeSQL ausgeführt werden? Falls True, wird die Eingabe zurückgeliefert
:type raw: boolean
:return: das vervollständigte SQL-Statement
:rtype: str
"""
if raw:
return str(sql)
else:
return self.client_table.service.getCompleteSQL(sql);
def dbQueryAll(self, sql : sql_utils.SqlStatement, *args:Any, raw:bool=False,
apply:Optional[Callable[[pyodbc.Row],Any]]=None) -> Any:
"""Führt eine SQL Query aus und liefert alle Zeilen zurück. Das SQL wird zunächst
vom Server angepasst, so dass z.B. Mandanteninformation hinzugefügt werden."""
sqlC = self.completeSQL(sql, raw=raw);
return applus_db.rawQueryAll(self.db_conn, sqlC, *args, apply=apply)
def dbQuerySingleValues(self, sql : sql_utils.SqlStatement, *args:Any, raw:bool=False) -> Sequence[Any]:
"""Führt eine SQL Query aus, die nur eine Spalte zurückliefern soll."""
return self.dbQueryAll(sql, *args, raw=raw, apply=lambda r: r[0])
def dbQuery(self, sql : sql_utils.SqlStatement, f : Callable[[pyodbc.Row], None], *args : Any, raw:bool=False) -> None:
"""Führt eine SQL Query aus und führt für jede Zeile die übergeben Funktion aus.
Das SQL wird zunächst vom Server angepasst, so dass z.B. Mandanteninformation hinzugefügt werden."""
sqlC = self.completeSQL(sql, raw=raw);
applus_db.rawQuery(self.db_conn, sqlC, f, *args)
def dbQuerySingleRow(self, sql:sql_utils.SqlStatement, *args:Any, raw:bool=False) -> Optional[pyodbc.Row]:
"""Führt eine SQL Query aus, die maximal eine Zeile zurückliefern soll. Diese Zeile wird geliefert."""
sqlC = self.completeSQL(sql, raw=raw);
return applus_db.rawQuerySingleRow(self.db_conn, sqlC, *args)
def dbQuerySingleRowDict(self, sql:sql_utils.SqlStatement, *args:Any, raw:bool=False) -> Optional[Dict[str, Any]]:
"""Führt eine SQL Query aus, die maximal eine Zeile zurückliefern soll.
Diese Zeile wird als Dictionary geliefert."""
row = self.dbQuerySingleRow(sql, *args, raw=raw);
if row:
return applus_db.row_to_dict(row);
else:
return None
def dbQuerySingleValue(self, sql:sql_utils.SqlStatement, *args:Any, raw:bool=False) -> Any:
"""Führt eine SQL Query aus, die maximal einen Wert zurückliefern soll.
Dieser Wert oder None wird geliefert."""
sqlC = self.completeSQL(sql, raw=raw);
return applus_db.rawQuerySingleValue(self.db_conn, sqlC, *args)
def isDBTableKnown(self, table : str) -> bool:
"""Prüft, ob eine Tabelle im System bekannt ist"""
sql = "select count(*) from SYS.TABLES T where T.NAME=?"
c = self.dbQuerySingleValue(sql, table);
return (c > 0)
def getClient(self, package : str, name : str) -> Client:
"""Erzeugt einen zeep - Client.
Mittels dieses Clients kann die WSDL Schnittstelle angesprochen werden.
Wird als *package* "p2core" und als *name* "Table" verwendet und der
resultierende client "client" genannt, dann kann
z.B. mittels "client.service.getCompleteSQL(sql)" vom AppServer ein Vervollständigen
des SQLs angefordert werden.
:param package: das Packet, z.B. "p2core"
:type package: str
:param name: der Name im Packet, z.B. "Table"
:type package: string
:return: den Client
:rtype: Client
"""
return self.server_conn.getClient(package, name);
def getTableFields(self, table:str, isComputed:Optional[bool]=None) -> Set[str]:
"""
Liefert die Namen aller Felder einer Tabelle.
:param table: Name der Tabelle
:param isComputed: wenn gesetzt, werden nur die Felder geliefert, die berechnet werden oder nicht berechnet werden
:return: Liste aller Feld-Namen
:rtype: {str}
"""
sql = sql_utils.SqlStatementSelect("SYS.TABLES T")
join = sql.addInnerJoin("SYS.COLUMNS C");
join.on.addConditionFieldsEq("T.Object_ID", "C.Object_ID")
if not (isComputed == None):
join.on.addConditionFieldEq("c.is_computed", isComputed)
sql.addFields("C.NAME")
sql.where.addConditionFieldEq("t.name", sql_utils.SqlParam())
return sql_utils.normaliseDBfieldSet(self.dbQueryAll(sql, table, apply=lambda r : r.NAME));
def getUniqueFieldsOfTable(self, table : str) -> Dict[str, List[str]]:
"""
Liefert alle Spalten einer Tabelle, die eindeutig sein müssen.
Diese werden als Dictionary gruppiert nach Index-Namen geliefert.
Jeder Eintrag enthält eine Liste von Feldern, die zusammen eindeutig sein
müssen.
"""
return applus_db.getUniqueFieldsOfTable(self.db_conn, table)
def useXML(self, xml : str) -> Any:
"""Ruft ``p2core.xml.usexml`` auf. Wird meist durch ein ``UseXMLRow-Objekt`` aufgerufen."""
return self.client_xml.service.useXML(xml);
def mkUseXMLRowInsert(self, table : str) -> applus_usexml.UseXmlRowInsert:
"""
Erzeugt ein Objekt zum Einfügen eines neuen DB-Eintrags.
:param table: DB-Tabelle in die eingefügt werden soll
:type table: str
:return: das XmlRow-Objekt
:rtype: applus_usexml.UseXmlRowInsert
"""
return applus_usexml.UseXmlRowInsert(self, table)
def mkUseXMLRowUpdate(self, table : str, id : int) -> applus_usexml.UseXmlRowUpdate:
return applus_usexml.UseXmlRowUpdate(self, table, id)
def mkUseXMLRowInsertOrUpdate(self, table : str) -> applus_usexml.UseXmlRowInsertOrUpdate:
"""
Erzeugt ein Objekt zum Einfügen oder Updaten eines DB-Eintrags.
:param table: DB-Tabelle in die eingefügt werden soll
:type table: string
:return: das XmlRow-Objekt
:rtype: applus_usexml.UseXmlRowInsertOrUpdate
"""
return applus_usexml.UseXmlRowInsertOrUpdate(self, table)
def mkUseXMLRowDelete(self, table:str, id:int) -> applus_usexml.UseXmlRowDelete :
return applus_usexml.UseXmlRowDelete(self, table, id)
def execUseXMLRowDelete(self, table:str, id:int) -> None:
delRow = self.mkUseXMLRowDelete(table, id)
delRow.delete();
def nextNumber(self, obj : str) -> str:
"""
Erstellt eine neue Nummer für das Objekt und legt diese Nummer zurück.
"""
return self.client_nummer.service.nextNumber(obj)
def makeWebLink(self, base : str, **kwargs : Any) -> str :
if not self.web_settings.baseurl:
raise Exception("keine Webserver-BaseURL gesetzt");
url = str(self.web_settings.baseurl) + base;
firstArg = True
for arg, argv in kwargs.items():
if not (argv == None):
if firstArg:
firstArg = False;
url += "?"
else:
url += "&"
url += arg + "=" + urllib.parse.quote(str(argv))
return url;
def makeWebLinkWauftragPos(self, **kwargs : Any) -> str:
return self.makeWebLink("wp/wauftragPosRec.aspx", **kwargs);
def makeWebLinkWauftrag(self, **kwargs : Any) -> str :
return self.makeWebLink("wp/wauftragRec.aspx", **kwargs);
def makeWebLinkBauftrag(self, **kwargs : Any) -> str :
return self.makeWebLink("wp/bauftragRec.aspx", **kwargs);
def applusFromConfigDict(yamlDict:Dict[str, Any], user:Optional[str]=None, env:Optional[str]=None) -> APplusServer:
"""Läd Einstellungen aus einer Config und erzeugt daraus ein APplus-Objekt"""
if user is None or user=='':
user = yamlDict["appserver"]["user"]
if env is None or env=='':
env = yamlDict["appserver"]["env"]
app_server = applus_server.APplusAppServerSettings(
appserver=yamlDict["appserver"]["server"],
appserverPort=yamlDict["appserver"]["port"],
user=user, # type: ignore
env=env)
web_server = applus_server.APplusWebServerSettings(
baseurl=yamlDict.get("webserver", {}).get("baseurl", None)
)
dbparams = applus_db.APplusDBSettings(
server=yamlDict["dbserver"]["server"],
database=yamlDict["dbserver"]["db"],
user=yamlDict["dbserver"]["user"],
password=yamlDict["dbserver"]["password"]);
return APplusServer(dbparams, app_server, web_server);
def applusFromConfigFile(yamlfile : 'FileDescriptorOrPath',
user:Optional[str]=None, env:Optional[str]=None) -> APplusServer:
"""Läd Einstellungen aus einer Config-Datei und erzeugt daraus ein APplus-Objekt"""
yamlDict = {}
with open(yamlfile, "r") as stream:
yamlDict = yaml.safe_load(stream)
return applusFromConfigDict(yamlDict, user=user, env=env)
def applusFromConfig(yamlString : str, user:Optional[str]=None, env:Optional[str]=None) -> APplusServer:
"""Läd Einstellungen aus einer Config-Datei und erzeugt daraus ein APplus-Objekt"""
yamlDict = yaml.safe_load(yamlString)
return applusFromConfigDict(yamlDict, user=user, env=env)

171
src/PyAPplus64/applus_db.py Normal file
View File

@ -0,0 +1,171 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#-*- coding: utf-8 -*-
import pyodbc # type: ignore
import logging
from .sql_utils import SqlStatement
from . import sql_utils
from typing import *
logger = logging.getLogger(__name__);
class APplusDBSettings:
"""
Einstellungen, mit welcher DB sich verbunden werden soll.
"""
def __init__(self, server : str, database : str, user : str, password : str):
self.server = server
self.database = database;
self.user = user
self.password = password
def getConnectionString(self) -> str:
"""Liefert den ODBC Connection-String für die Verbindung.
:return: den Connection-String
"""
return ("Driver={SQL Server Native Client 11.0};"
"Server="+self.server+";"
"Database="+self.database+";"
"UID="+self.user+";"
"PWD="+self.password + ";")
def connect(self) -> pyodbc.Connection:
"""Stellt eine neue Verbindung her und liefert diese zurück.
"""
return pyodbc.connect(self.getConnectionString())
def row_to_dict(row : pyodbc.Row) -> Dict[str, Any]:
"""Konvertiert eine Zeile in ein Dictionary"""
return dict(zip([t[0] for t in row.cursor_description], row))
def _logSQLWithArgs(sql : SqlStatement, *args : Any) -> None:
if args:
logger.debug("executing '{}' with args {}".format(str(sql), str(args)))
else:
logger.debug("executing '{}'".format(str(sql)))
def rawQueryAll(
cnxn : pyodbc.Connection,
sql : SqlStatement,
*args : Any,
apply : Optional[Callable[[pyodbc.Row], Any]]=None) -> Sequence[Any]:
"""
Führt eine SQL Query direkt aus und liefert alle Zeilen zurück.
Wenn apply gesetzt ist, wird die Funktion auf jeder Zeile ausgeführt und das Ergebnis ausgeben, die nicht None sind.
"""
_logSQLWithArgs(sql, *args)
with cnxn.cursor() as cursor:
cursor.execute(str(sql), *args)
rows = cursor.fetchall();
if apply is None:
return rows
else:
res = []
for r in rows:
rr = apply(r)
if not (rr == None):
res.append(rr)
return res
def rawQuery(cnxn : pyodbc.Connection, sql : sql_utils.SqlStatement, f : Callable[[pyodbc.Row], None], *args : Any) -> None:
"""Führt eine SQL Query direkt aus und führt für jede Zeile die übergeben Funktion aus."""
_logSQLWithArgs(sql, *args)
with cnxn.cursor() as cursor:
cursor.execute(str(sql), *args)
for row in cursor:
f(row);
def rawQuerySingleRow(cnxn : pyodbc.Connection, sql : SqlStatement, *args : Any) -> Optional[pyodbc.Row]:
"""Führt eine SQL Query direkt aus, die maximal eine Zeile zurückliefern soll. Diese Zeile wird geliefert."""
_logSQLWithArgs(sql, *args)
with cnxn.cursor() as cursor:
cursor.execute(str(sql), *args)
return cursor.fetchone();
def rawQuerySingleValue(cnxn : pyodbc.Connection, sql : SqlStatement, *args : Any) -> Any:
"""Führt eine SQL Query direkt aus, die maximal einen Wert zurückliefern soll. Dieser Wert oder None wird geliefert."""
_logSQLWithArgs(sql, *args)
with cnxn.cursor() as cursor:
cursor.execute(str(sql), *args)
row = cursor.fetchone();
if row:
return row[0];
else:
return None;
def getUniqueFieldsOfTable(cnxn : pyodbc.Connection, table : str) -> Dict[str, List[str]] :
"""
Liefert alle Spalten einer Tabelle, die eindeutig sein müssen.
Diese werden als Dictionary gruppiert nach Index-Namen geliefert.
Jeder Eintrag enthält eine Liste von Feldern, die zusammen eindeutig sein
müssen.
"""
sql = sql_utils.SqlStatementSelect("sys.indexes AS i")
join = sql.addInnerJoin("sys.index_columns AS ic")
join.on.addCondition("i.OBJECT_ID = ic.OBJECT_ID")
join.on.addCondition("i.index_id = ic.index_id")
sql.where.addConditionFieldEq("OBJECT_NAME(ic.OBJECT_ID)", table)
sql.where.addConditionFieldEq("i.is_unique", True)
sql.addFields("i.name AS INDEX_NAME", "COL_NAME(ic.OBJECT_ID,ic.column_id) AS COL")
_logSQLWithArgs(sql)
indices : Dict[str, List[str]] = {}
with cnxn.cursor() as cursor:
cursor.execute(str(sql))
for row in cursor:
cols = indices.get(row.INDEX_NAME, [])
cols.append(sql_utils.normaliseDBfield(row.COL))
indices[row.INDEX_NAME] = cols
return indices
class DBTableIDs():
"""Klasse, die Mengen von IDs gruppiert nach Tabellen speichert"""
def __init__(self) -> None:
self.data : Dict[str, Set[int]]= {}
def add(self, table:str, *ids : int) -> None:
"""
fügt Eintrag hinzu
:param table: die Tabelle
:type table: str
:param id: die ID
"""
table = table.upper()
if not (table in self.data):
self.data[table] = set(ids);
else:
self.data[table].update(ids)
def getTable(self, table : str) -> Set[int]:
"""
Liefert die Menge der IDs für eine bestimmte Tabelle.
:param table: die Tabelle
:type table: str
:return: die IDs
"""
table = table.upper()
return self.data.get(table, set())
def __str__(self) -> str:
return str(self.data)

View File

@ -0,0 +1,148 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#-*- coding: utf-8 -*-
from .applus import APplusServer
from . import sql_utils
import lxml.etree as ET # type: ignore
from typing import *
class XMLDefinition:
"""Repräsentation eines XML-Dokuments"""
def __init__(self, root : ET.Element) -> None:
self.root : ET.Element = root
"""das Root-Element, repräsentiert "object" aus Datei."""
def __str__(self) -> str:
return ET.tostring(self.root, encoding = "unicode")
def getDuplicate(self) -> Tuple[Set[str], bool]:
"""
Extrahiert alle Properties, die für Duplizieren konfiguriert sind.
Zudem wird ein Flag geliefert, ob diese Properties ein oder ausgeschlossen werden sollen.
:return: Tuple aus allen Properties und ob dies aus- (True) oder ein-(False) zuschließen sind.
:rtype: Tuple[Set[str], bool]
"""
res : Set[str] = set()
excl = True;
dupl = self.root.find("duplicate")
if (dupl is None):
return (res, excl);
exclS = dupl.get("type", default="exclude")
excl = exclS.casefold() == "exclude"
for e in dupl.findall("{ref}property"):
v = e.get("ref")
if not (v is None):
res.add(sql_utils.normaliseDBfield(str(v)))
return (res, excl)
class APplusScriptTool:
"""
Zugriff auf AppServer ScriptTool
:param server: die Verbindung zum Server
:type server: APplusServerConnection
"""
def __init__(self, server : APplusServer) -> None:
self.client = server.getClient("p2script", "ScriptTool")
def getCurrentDate(self) -> str:
return self.client.service.getCurrentDate()
def getCurrentTime(self) -> str:
return self.client.service.getCurrentTime()
def getCurrentDateTime(self) -> str:
return self.client.service.getCurrentDateTime()
def getLoginName(self) -> str:
return self.client.service.getLoginName()
def getUserName(self) -> str:
return self.client.service.getUserName()
def getUserFullName(self) -> str:
return self.client.service.getUserFullName()
def getSystemName(self) -> str:
return self.client.service.getSystemName()
def getXMLDefinitionString(self, obj:str, mandant:str="") -> str:
"""
Läd die XML-Defintion als String vom APPServer. Auch wenn kein XML-Dokument im Dateisystem gefunden wird,
wird ein String zurückgeliefert, der einen leeren Top-"Object" Knoten enthält. Für gefundene XML-Dokumente
gibt es zusätzlich einen Top-"MD5"-Knoten.
:param obj: das Objekt, dessen Definition zu laden ist, "Artikel" läd z.B. "ArtikelDefinition.xml"
:type obj: str
:param mandant: der Mandant, dessen XML-Doku geladen werden soll, wenn "" wird der Standard-Mandant verwendet
:type mandant: str optional
:return: das gefundene XML-Dokument als String
:rtype: str
"""
return self.client.service.getXMLDefinition2(obj, "")
def getXMLDefinition(self, obj:str, mandant:str="", checkFileExists:bool=False) -> Optional[ET.Element]:
"""
Läd die XML-Definition als String vom APPServer. und parst das XML in ein minidom-Dokument.
:param obj: das Objekt, dessen Definition zu laden ist, "Artikel" läd z.B. "ArtikelDefinition.xml"
:type obj: str
:param mandant: der Mandant, dessen XML-Doku geladen werden soll, wenn "" wird der Standard-Mandant verwendet
:type mandant: str optional
:return: das gefundene und mittels ElementTree geparste XML-Dokument
:rtype: ET.Element
"""
return ET.fromstring(self.getXMLDefinitionString(obj, mandant=mandant))
def getXMLDefinitionObj(self, obj:str, mandant:str="") -> Optional[XMLDefinition]:
"""
Benutzt getXMLDefinitionObj und liefert den Top-Level "Object" Knoten zurück, falls zusätzlich
ein MD5 Knoten existiert, also falls das Dokument wirklich vom Dateisystem geladen werden konnte.
Ansonten wird None geliefert.
:param obj: das Objekt, dess Definition zu laden ist, "Artikel" läd z.B. "ArtikelDefinition.xml"
:type obj: str
:param mandant: der Mandant, dessen XML-Doku geladen werden soll, wenn "" wird der Standard-Mandant verwendet
:type mandant: str optional
:return: das gefundene und mittels ElementTree geparste XML-Dokument
:rtype: Optional[XMLDefinition]
"""
e = self.getXMLDefinition(obj, mandant=mandant);
if e is None:
return None
if e.find("md5") is None:
return None;
o = e.find("object")
if o is None:
return None
else:
return XMLDefinition(o);
def getMandant(self) -> str:
"""
Liefert den aktuellen Mandanten
"""
return self.client.service.getCurrentClientProperty("MANDANTID")
def getMandantName(self) -> str:
"""
Liefert den Namen des aktuellen Mandanten
"""
return self.client.service.getCurrentClientProperty("NAME")

View File

@ -0,0 +1,87 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#-*- coding: utf-8 -*-
from requests import Session # type: ignore
from requests.auth import HTTPBasicAuth # type: ignore # or HTTPDigestAuth, or OAuth1, etc.
from zeep import Client
from zeep.transports import Transport
from zeep.cache import SqliteCache
from typing import Optional, Dict
class APplusAppServerSettings:
"""
Einstellungen, mit welchem APplus App-Server sich verbunden werden soll.
"""
def __init__(self, appserver : str, appserverPort : int, user : str, env : Optional[str] = None):
self.appserver = appserver
self.appserverPort = appserverPort
self.user = user
self.env = env
class APplusWebServerSettings:
"""
Einstellungen, mit welchem APplus Web-Server sich verbunden werden soll.
"""
def __init__(self, baseurl:Optional[str]=None):
self.baseurl : Optional[str] = baseurl;
try:
assert (isinstance(self.baseurl, str))
if not (self.baseurl == None) and not (self.baseurl[-1] == "/"):
self.baseurl = self.baseurl + "/";
except:
pass
class APplusServerConnection:
"""Verbindung zu einem APplus APP-Server
:param settings: die Einstellungen für die Verbindung mit dem APplus Server
:type settings: APplusAppServerSettings
"""
def __init__(self, settings : APplusAppServerSettings) -> None:
userEnv = settings.user;
if (settings.env):
userEnv += "|" + settings.env
session = Session()
session.auth = HTTPBasicAuth(userEnv, "")
self.transport = Transport(cache=SqliteCache(), session=session)
# self.transport = Transport(session=session)
self.clientCache : Dict[str, Client] = {}
self.settings=settings;
self.appserverUrl = "http://" + settings.appserver + ":" + str(settings.appserverPort) + "/";
def getClient(self, package : str, name : str) -> Client:
"""Erzeugt einen zeep - Client.
Mittels dieses Clients kann die WSDL Schnittstelle angesprochen werden.
Wird als *package* "p2core" und als *name* "Table" verwendet und der
resultierende client "client" genannt, dann kann
z.B. mittels "client.service.getCompleteSQL(sql)" vom AppServer ein Vervollständigen
des SQLs angefordert werden.
:param package: das Packet, z.B. "p2core"
:type package: str
:param name: der Name im Packet, z.B. "Table"
:type package: string
:return: den Client
:rtype: Client
"""
url = package+"/"+name;
try:
return self.clientCache[url];
except:
fullClientUrl = self.appserverUrl + url + ".jws?wsdl"
client = Client(fullClientUrl, transport=self.transport)
self.clientCache[url] = client;
return client;

View File

@ -0,0 +1,59 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#-*- coding: utf-8 -*-
from typing import *
if TYPE_CHECKING:
from .applus import APplusServer
class APplusSysConf:
"""
SysConf Zugriff mit Cache über AppServer
:param server: die Verbindung zum Server
:type server: APplusServer
"""
def __init__(self, server : 'APplusServer') -> None:
self.client = server.getClient("p2system", "SysConf")
self.cache : Dict[str, type] = {}
def clearCache(self) -> None:
self.cache = {};
def _getGeneral(self, ty:str, f : Callable[[str, str], Any], module:str, name:str, useCache:bool) -> Any:
cacheKey = module + "/" + name + "/" + ty;
if useCache and cacheKey in self.cache:
return self.cache[cacheKey]
else:
v = f(module, name);
self.cache[cacheKey] = v;
return v;
def getString(self, module:str, name:str, useCache:bool=True) -> str:
return self._getGeneral("string", self.client.service.getString, module, name, useCache);
def getInt(self, module:str, name:str, useCache:bool=True) -> int:
return self._getGeneral("int", self.client.service.getInt, module, name, useCache);
def getDouble(self, module:str, name:str, useCache:bool=True) -> float:
return self._getGeneral("double", self.client.service.getDouble, module, name, useCache);
def getBoolean(self, module:str, name:str, useCache:bool=True) -> bool:
return self._getGeneral("boolean", self.client.service.getBoolean, module, name, useCache);
def getList(self, module : str, name:str, useCache:bool=True, sep:str=",") -> Optional[Sequence[str]]:
s = self.getString(module, name, useCache=useCache);
if (s == None or s == ""):
return None
return s.split(sep);

View File

@ -0,0 +1,335 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#-*- coding: utf-8 -*-
import lxml.etree as ET # type: ignore
from . import sql_utils
import datetime
from typing import *
if TYPE_CHECKING:
from .applus import APplusServer
def _formatValueForXMLRow(v : Any) -> str:
"""Hilfsfunktion zum Formatieren eines Wertes für XML"""
if (v is None):
return "";
if isinstance(v, (int, float)):
return str(v);
elif isinstance(v, str):
return v;
elif isinstance(v, datetime.datetime):
return v.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
elif isinstance(v, datetime.date):
return v.strftime("%Y-%m-%d")
elif isinstance(v, datetime.time):
return v.strftime("%H:%M:%S.%f")[:-3]
else:
return str(v)
class UseXmlRow:
"""
Klasse, die eine XML-Datei erzeugen kann, die mittels p2core.useXML
genutzt werden kann. Damit ist es möglich APplus BusinessObjekte zu
erzeugen, ändern und zu löschen. Im Gegensatz zu direkten DB-Zugriffen,
werden diese Anfragen über den APP-Server ausgeführt. Dabei werden
die von der Weboberfläche bekannten Checks und Änderungen ausgeführt.
Als sehr einfaches Beispiel wird z.B. INSDATE oder UPDDATE automatisch gesetzt.
Interessanter sind automatische Änderungen und Checks.
Bei der Benutzung wird zunächst ein Objekt erzeugt, dann evtl.
mittels :meth:`addField` Felder hinzugefügt und schließlich mittels
:meth:`exec` an den AppServer übergeben.
Normalerweise sollte die Klasse nicht direkt, sondern über Unterklassen
für das Einfügen, Ändern oder Löschen benutzt werden.
:param applus: Verbindung zu APplus
:type applus: APplusServer
:param table: die Tabelle
:type table: str
:param cmd: cmd-attribut der row, also ob es sich um ein Update, ein Insert oder ein Delete handelt
:type cmd: str
"""
def __init__(self, applus : 'APplusServer', table : str, cmd : str) -> None:
self.applus = applus
self.table = table
self.cmd = cmd
self.fields : Dict[str, Any] = {}
def __str__(self) -> str:
return self.toprettyxml()
def _buildXML(self) -> ET.Element :
"""Hilfsfunktion, die das eigentliche XML baut"""
row = ET.Element("row", cmd=self.cmd, table=self.table, nsmap={ "dt" : "urn:schemas-microsoft-com:datatypes"});
for name, value in self.fields.items():
child = ET.Element(name)
child.text = _formatValueForXMLRow(value)
row.append(child)
return row
def toprettyxml(self)->str:
"""
Gibt das formatierte XML aus. Dieses kann per useXML an den AppServer übergeben werden.
Dies wird mittels :meth:`exec` automatisiert.
"""
return ET.tostring(self._buildXML(), encoding = "unicode", pretty_print=True)
def getField(self, name:str) -> Any:
"""Liefert den Wert eines gesetzten Feldes"""
if name is None:
return None
name = sql_utils.normaliseDBfield(name);
if name in self.fields:
return self.fields[name]
elif name == "MANDANT":
return self.applus.scripttool.getMandant()
else:
return None
def checkFieldSet(self, name:Optional[str]) -> bool:
"""Prüft, ob ein Feld gesetzt wurde"""
if name is None:
return False
name = sql_utils.normaliseDBfield(name)
return (name in self.fields) or (name == "MANDANT")
def checkFieldsSet(self, *names : str) -> bool:
"""Prüft, ob alle übergebenen Felder gesetzt sind"""
for n in names:
if not (self.checkFieldSet(n)):
return False
return True
def addField(self, name:str|None, value:Any) -> None:
"""
Fügt ein Feld zum Row-Node hinzu.
:param name: das Feld
:type name: string
:param value: Wert des Feldes
"""
if name is None:
return
self.fields[sql_utils.normaliseDBfield(name)] = value
def addTimestampField(self, id:int, ts:Optional[bytes]=None) -> None:
"""
Fügt ein Timestamp-Feld hinzu. Wird kein Timestamp übergeben, wird mittels der ID der aktuelle
Timestamp aus der DB geladen. Dabei kann ein Fehler auftreten.
Ein Timestamp-Feld ist für Updates und das Löschen nötig um sicherzustellen, dass die richtige
Version des Objekts geändert oder gelöscht wird. Wird z.B. ein Objekt aus der DB geladen, inspiziert
und sollen dann Änderungen gespeichert werden, so sollte der Timestamp des Ladens übergeben werden.
So wird sichergestellt, dass nicht ein anderer User zwischenzeitlich Änderungen vornahm. Ist dies
der Fall, wird dann bei "exec" eine Exception geworfen.
:param id: DB-id des Objektes dessen Timestamp hinzugefügt werden soll
:type id: string
:param ts: Fester Timestamp der verwendet werden soll, wenn None wird der Timestamp aus der DB geladen.
:type ts: bytes
"""
if ts is None:
ts = self.applus.dbQuerySingleValue("select timestamp from " + self.table + " where id = ?", id);
if ts:
self.addField("timestamp", ts.hex());
else:
raise Exception("kein Eintrag in Tabelle '" + self.table + " mit ID " + str(id) + " gefunden")
def addTimestampIDFields(self, id:int, ts:Optional[bytes]=None) -> None:
"""
Fügt ein Timestamp-Feld sowie ein Feld id hinzu. Wird kein Timestamp übergeben, wird mittels der ID der aktuelle
Timestamp aus der DB geladen. Dabei kann ein Fehler auftreten. Intern wird :meth:`addTimestampField` benutzt.
:param id: DB-id des Objektes dessen Timestamp hinzugefügt werden soll
:type id: string
:param ts: Fester Timestamp der verwendet werden soll, wenn None wird der Timestamp aus der DB geladen.
:type ts: bytes
"""
self.addField("id", id)
self.addTimestampField(id, ts=ts);
def exec(self) -> Any:
"""
Führt die UseXmlRow mittels useXML aus. Je nach Art der Zeile wird etwas zurückgeliefert oder nicht.
In jedem Fall kann eine Exception geworfen werden.
"""
return self.applus.useXML(self.toprettyxml());
class UseXmlRowInsert(UseXmlRow):
"""
Klasse, die eine XML-Datei für das Einfügen eines neuen Datensatzes erzeugen kann.
:param applus: Verbindung zu APplus
:type applus: APplusServer
:param table: die Tabelle
:type table: string
"""
def __init__(self, applus:'APplusServer', table:str) -> None:
super().__init__(applus, table, "insert");
def insert(self) -> int:
"""
Führt das insert aus. Entweder wird dabei eine Exception geworfen oder die ID des neuen Eintrags zurückgegeben.
Dies ist eine Umbenennung von :meth:`exec`.
"""
return super().exec();
class UseXmlRowDelete(UseXmlRow):
"""
Klasse, die eine XML-Datei für das Löschen eines neuen Datensatzes erzeugen kann.
Die Felder `id` und `timestamp` werden automatisch gesetzt.
Dies sind die einzigen Felder, die gesetzt werden sollten.
:param applus: Verbindung zu APplus
:type applus: APplusServer
:param table: die Tabelle
:type table: string
:param id: die zu löschende ID
:type id: int
:param ts: wenn gesetzt, wird dieser Timestamp verwendet, sonst der aktuelle aus der DB
:type ts: bytes optional
"""
def __init__(self, applus:'APplusServer', table:str, id:int, ts:Optional[bytes]=None) -> None:
super().__init__(applus, table, "delete");
self.addTimestampIDFields(id, ts=ts);
def delete(self) -> None:
"""
Führt das delete aus. Evtl. wird dabei eine Exception geworfen.
Dies ist eine Umbenennung von :meth:`exec`.
"""
super().exec();
class UseXmlRowUpdate(UseXmlRow):
"""
Klasse, die eine XML-Datei für das Ändern eines neuen Datensatzes, erzeugen kann.
Die Felder `id` und `timestamp` werden automatisch gesetzt.
:param applus: Verbindung zu APplus
:type applus: APplusServer
:param table: die Tabelle
:type table: string
:param id: die ID des zu ändernden Datensatzes
:type id: int
:param ts: wenn gesetzt, wird dieser Timestamp verwendet, sonst der aktuelle aus der DB
:type ts: bytes optional
"""
def __init__(self, applus : 'APplusServer', table : str, id : int, ts:Optional[bytes]=None) -> None:
super().__init__(applus, table, "update");
self.addTimestampIDFields(id, ts=ts);
def update(self) -> None:
"""
Führt das update aus. Evtl. wird dabei eine Exception geworfen.
Dies ist eine Umbenennung von :meth:`exec`.
"""
super().exec();
class UseXmlRowInsertOrUpdate(UseXmlRow):
"""
Klasse, die eine XML-Datei für das Einfügen oder Ändern eines neuen Datensatzes, erzeugen kann.
Die Methode `checkExists` erlaubt es zu prüfen, ob ein Objekt bereits existiert. Dafür werden die
gesetzten Felder mit den Feldern aus eindeutigen Indices verglichen. Existiert ein Objekt bereits, wird
ein Update ausgeführt, ansonsten ein Insert. Bei Updates werden die Felder `id` und `timestamp`
automatisch gesetzt.
:param applus: Verbindung zu APplus
:type applus: APplusServer
:param table: die Tabelle
:type table: string
"""
def __init__(self, applus : 'APplusServer', table : str) -> None:
super().__init__(applus, table, "");
def checkExists(self) -> int|None:
"""
Prüft, ob der Datensatz bereits in der DB existiert.
Ist dies der Fall, wird die ID geliefert, sonst None
"""
# Baue Bedingung
cond = sql_utils.SqlConditionOr();
for idx, fs in self.applus.getUniqueFieldsOfTable(self.table).items():
if (self.checkFieldsSet(*fs)):
condIdx = sql_utils.SqlConditionAnd();
for f in fs:
condIdx.addConditionFieldEq(f, self.getField(f))
cond.addCondition(condIdx)
sql = sql_utils.SqlStatementSelect(self.table, "id")
sql.where = cond
return self.applus.dbQuerySingleValue(sql)
def insert(self) -> int:
"""Führt ein Insert aus. Existiert das Objekt bereits, wird eine Exception geworfen."""
r = UseXmlRowInsert(self.applus, self.table)
for k, v in self.fields.items():
r.addField(k, v)
return r.insert();
def update(self, id:Optional[int]=None, ts:Optional[bytes]=None) -> int:
"""Führt ein Update aus. Falls ID oder Timestamp nicht übergeben werden, wird
nach einem passenden Objekt gesucht. Existiert das Objekt nicht, wird eine Exception geworfen."""
if id is None:
id = self.checkExists()
if id is None:
raise Exception("Update nicht möglich, da kein Objekt für Update gefunden.")
r = UseXmlRowUpdate(self.applus, self.table, id, ts=ts)
for k, v in self.fields.items():
r.addField(k, v)
r.update();
return id
def exec(self) -> int:
"""
Führt entweder ein Update oder ein Insert durch. Dies hängt davon ab, ob das Objekt bereits in
der DB existiert. In jedem Fall wird die ID des erzeugten oder geänderten Objekts geliefert.
"""
id = self.checkExists();
if id == None:
return self.insert()
else:
return self.update(id=id)
def updateOrInsert(self) -> int:
"""
Führt das update oder das insert aus. Evtl. wird dabei eine Exception geworfen.
Dies ist eine Umbenennung von :meth:`exec`.
Es wird die ID des Eintrages geliefert
"""
return self.exec();

571
src/PyAPplus64/duplicate.py Normal file
View File

@ -0,0 +1,571 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#-*- coding: utf-8 -*-
"""
Dupliziert ein oder mehrere APplus Business-Objekte
"""
from . import sql_utils
from . import applus_db
from . import applus_usexml
from .applus import APplusServer
import pyodbc # type: ignore
import traceback
import logging
from typing import *
logger = logging.getLogger(__name__);
noCopyFields = sql_utils.normaliseDBfieldSet({"INSUSER", "UPDDATE", "TIMESTAMP", "MANDANT", "GUID", "ID", "TIMESTAMP_A", "INSDATE", "ID_A", "UPDUSER"})
"""Menge von Feld-Namen, die nie kopiert werden sollen."""
def getFieldsToCopyForTable(server : APplusServer, table : str, force:bool=True) -> Set[str]:
"""
Bestimmt die für eine Tabelle zu kopierenden Spalten. Dazu wird in den XML-Definitionen geschaut.
Ist dort 'include' hinterlegt, werden diese Spalten verwendet. Ansonsten alle nicht generierten Spalten,
ohne die 'exclude' Spalten. In jedem Fall werden Spalten wie "ID", die nie kopiert werden sollten, entfernt.
"""
xmlDefs = server.scripttool.getXMLDefinitionObj(table)
fields : Set[str]
if (xmlDefs is None):
if not force:
raise Exception ("Keine XML-Definitionen für '{}' gefunden".format(table));
(fields, excl) = (set(), True)
else:
(fields, excl) = xmlDefs.getDuplicate()
if not excl:
return fields.difference(noCopyFields)
allFields = server.getTableFields(table, isComputed=False)
return allFields.difference(fields).difference(noCopyFields);
class FieldsToCopyForTableCache():
"""
Cache für welche Felder für welche Tabelle kopiert werden sollen
"""
def __init__(self, server : APplusServer) -> None:
self.server = server
self.cache : Dict[str, Set[str]]= {}
def getFieldsToCopyForTable(self, table : str) -> Set[str]:
"""
Bestimmt die für eine Tabelle zu kopierenden Spalten. Dazu wird in den XML-Definitionen geschaut.
Ist dort 'include' hinterlegt, werden diese Spalten verwendet. Ansonsten alle nicht generierten Spalten,
ohne die 'exclude' Spalten. In jedem Fall werden Spalten wie "ID", die nie kopiert werden sollten, entfernt.
"""
if (table is None):
return None
t = table.upper()
fs = self.cache.get(t, None)
if not (fs is None):
return fs
else:
fs = getFieldsToCopyForTable(self.server, t)
self.cache[t] = fs
return fs
def initFieldsToCopyForTableCacheIfNeeded(server : APplusServer, cache : Optional[FieldsToCopyForTableCache]) -> FieldsToCopyForTableCache:
"""
Hilfsfunktion, die einen Cache erzeugt, falls dies noch nicht geschehen ist.
"""
if cache is None:
return FieldsToCopyForTableCache(server)
else:
return cache;
class DuplicateBusinessObject():
"""
Klasse, die alle Daten zu einem BusinessObject speichert und zum Duplizieren dieses Objektes dient.
Dies beinhaltet Daten zu abhängigen Objekten sowie die Beziehung zu diesen Objekten. Zu einem Artikel
wird z.B. der Arbeitsplan gespeichert, der wiederum Arbeitsplanpositionen enthält. Als Beziehung ist u.a.
hinterlegt, dass das Feld "APLAN" der Arbeitsplans dem Feld "ARTIKEL" des Artikels entsprechen muss und dass
"APLAN" aus den Positionen, "APLAN" aus dem APlan entsprichen muss. So kann beim Duplizieren ein
anderer Name des Artikels gesetzt werden und automatisch die Felder der abhängigen Objekte angepasst werden.
Einige Felder der Beziehung sind dabei statisch, d.h. können direkt aus den zu speichernden Daten abgelesen werden.
Andere Felder sind dynamisch, d.h. das Parent-Objekt muss in der DB angelegt werden, damit ein solcher dynamischer Wert erstellt
und geladen werden kann. Ein typisches Beispiel für ein dynamisches Feld ist "GUID".
"""
def __init__(self, table : str, fields : Dict[str, Any], fieldsNotCopied:Dict[str, Any]={}, allowUpdate:bool=False) -> None:
self.table = table
"""für welche Tabelle ist das BusinessObject"""
self.fields = fields
"""die Daten"""
self.fieldsNotCopied = fieldsNotCopied
"""Datenfelder, die im Original vorhanden sind, aber nicht kopiert werden sollen"""
self.dependentObjs : List[Dict[str, Any]] = []
"""Abhängige Objekte"""
self.allowUpdate = allowUpdate
"""Erlaube Updates statt Fehlern, wenn Objekt schon in DB existiert"""
def addDependentBusinessObject(self, dObj : Optional['DuplicateBusinessObject'], *args : Tuple[str, str]) -> None:
"""
Fügt ein neues Unterobjekt zum DuplicateBusinessObject hinzu.
Dabei handelt es sich selbst um ein DuplicateBusinessObject, das zusammen mit dem
Parent-Objekt dupliziert werden sollen. Zum Beispiel sollen zu einem
Auftrag auch die Positionen dupliziert werden.
Zusätzlich zum Objekt selbst können mehrere (keine, eine oder viele)
Paare von Feldern übergeben werden. Ein Paar ("pf", "sf") verbindet das
Feld "pf" des Parent-Objekts mit dem Feld "sf" des Sub-Objekts. So ist es möglich,
Werte des Parent-Objekts zu ändern und diese Änderungen für Sub-Objekte zu übernehmen.
Üblicherweise muss zum Beispiel die Nummer des Hauptobjekts geändert werden. Die
gleiche Änderung ist für alle abhängigen Objekte nötig, damit die neuen Objekte sich auf das
Parent-Objekt beziehen.
:param dObj: das Unter-Objekt
:type dObj: DuplicateBusinessObject
:param args: Liste von Tupeln, die Parent- und Sub-Objekt-Felder miteinander verbinden
"""
if (dObj is None):
return
args2= {}
for f1, f2 in args:
args2[sql_utils.normaliseDBfield(f1)] = sql_utils.normaliseDBfield(f2)
self.dependentObjs.append({
"dependentObj" : dObj,
"connection" : args2
})
def getField(self, field:str, onlyCopied:bool=False) -> Any:
"""
Schlägt den Wert eines Feldes nach. Wenn onlyCopied gesetzt ist, werden nur Felder zurückgeliefert, die auch kopiert
werden sollen.
"""
f = sql_utils.normaliseDBfield(field)
if (f in self.fields):
return self.fields[f]
if (not onlyCopied) and (f in self.fieldsNotCopied):
return self.fieldsNotCopied[f]
return None
def insert(self, server : APplusServer) -> applus_db.DBTableIDs:
"""
Fügt alle Objekte zur DB hinzu. Es wird die Menge der IDs der erzeugten
Objekte gruppiert nach Tabellen erzeugt. Falls ein Datensatz schon
existiert, wird dieser entweder aktualisiert oder eine Fehlermeldung
geworfen. Geliefert wird die Menge aller Eingefügten Objekte mit ihrer ID.
"""
res = applus_db.DBTableIDs()
def insertDO(do : 'DuplicateBusinessObject') -> Optional[int]:
nonlocal res
insertRow : applus_usexml.UseXmlRow
if do.allowUpdate:
insertRow = server.mkUseXMLRowInsertOrUpdate(do.table);
else:
insertRow = server.mkUseXMLRowInsert(do.table);
for f, v in do.fields.items():
insertRow.addField(f, v)
try:
id = insertRow.exec()
res.add(do.table, id)
return id
except:
msg = traceback.format_exc();
logger.error("Exception inserting BusinessObjekt: %s\n%s", str(insertRow), msg)
return None
def insertDep(do : 'DuplicateBusinessObject', doID : int, so : 'DuplicateBusinessObject', connect : Dict[str,str]) -> None:
nonlocal res
# Abbruch, wenn do nicht eingefügt wurde
if (doID is None):
return
# copy known fields of connect
connectMissing = {}
for fd, fs in connect.items():
if fd in do.fields:
so.fields[fs] = do.fields[fd]
else:
connectMissing[fd] = fs
# load missing fields from DB
if len(connectMissing) > 0:
sql = sql_utils.SqlStatementSelect(do.table);
sql.where.addConditionFieldEq("id", doID)
for fd in connectMissing:
sql.addFields(fd)
rd = server.dbQuerySingleRowDict(sql)
if not (rd is None):
for fd, fs in connectMissing.items():
so.fields[fs] = rd[fd]
# real insert
id = insertDO(so)
if not (id is None):
insertDeps(so, id)
def insertDeps(do : 'DuplicateBusinessObject', doID : int) -> None:
for so in do.dependentObjs:
insertDep(do, doID, so["dependentObj"], so["connection"])
topID = insertDO(self)
if not (topID is None):
insertDeps(self, topID)
return res
def setFields(self, upds : Dict[str, Any]) -> None:
"""
Setzt Felder des DuplicateBusinessObjektes und falls nötig seiner Unterobjekte.
So kann zum Beispiel die Nummer vor dem Speichern geändert werden.
:param upds: Dictionary mit zu setzenden Werten
"""
def setFieldsInternal(dobj : 'DuplicateBusinessObject', upds : Dict[str, Any]) -> None:
# setze alle Felder des Hauptobjekts
for f, v in upds.items():
dobj.fields[f] = v
# verarbeite alle Subobjekte
for su in dobj.dependentObjs:
subupds = {}
for fp, fs in su["connection"].items():
if fp in upds:
subupds[fs] = upds[fp]
setFieldsInternal(su["dependentObj"], subupds)
updsNorm : Dict[str, Any] = {}
for f, v in upds.items():
updsNorm[sql_utils.normaliseDBfield(f)] = v
setFieldsInternal(self, updsNorm)
def _loadDBDuplicateBusinessObjectDict(
server : APplusServer,
table : str,
row : pyodbc.Row,
cache:Optional[FieldsToCopyForTableCache]=None,
allowUpdate:bool=False) -> Optional[DuplicateBusinessObject]:
"""
Hilfsfunktion, die ein DuplicateBusinessObjekt erstellt. Die Daten stammen aus
einer PyOdbc Zeile. So ist es möglich, mit nur einem SQL-Statement,
mehrere DuplicateBusinessObjekte zu erstellen.
:param server: Verbindung zum APP-Server, benutzt zum Nachschlagen der zu kopierenden Felder
:param table: Tabelle für das neue DuplicateBusinessObjekt
:param row: die Daten als PyODBC Zeile
:param cache: Cache, so dass benötigte Felder nicht immer wieder neu berechnet werden müssen
:return: das neue DuplicateBusinessObject
"""
table = table.upper();
def getFieldsToCopy() -> Set[str]:
if cache is None:
return getFieldsToCopyForTable(server, table)
else:
return cache.getFieldsToCopyForTable(table)
def getFields() -> Tuple[Dict[str, Any], Dict[str, Any]]:
ftc = getFieldsToCopy()
fields = {}
fieldsNotCopied = {}
for f, v in applus_db.row_to_dict(row).items():
f = sql_utils.normaliseDBfield(f);
if f in ftc:
fields[f] = v
else:
fieldsNotCopied[f] = v
return (fields, fieldsNotCopied)
if (row is None):
return None
(fields, fieldsNotCopied) = getFields()
return DuplicateBusinessObject(table, fields, fieldsNotCopied=fieldsNotCopied, allowUpdate=allowUpdate)
def loadDBDuplicateBusinessObject(
server : APplusServer,
table : str,
cond : sql_utils.SqlCondition,
cache : Optional[FieldsToCopyForTableCache]=None,
allowUpdate : bool = False) -> Optional[DuplicateBusinessObject]:
"""
Läd ein einzelnes DuplicateBusinessObjekt aus der DB. Die Bedingung sollte dabei
einen eindeutigen Datensatz auswählen. Werden mehrere zurückgeliefert, wird ein
zufälliger ausgewählt. Wird kein Datensatz gefunden, wird None geliefert.
:param server: Verbindung zum APP-Server, benutzt zum Nachschlagen der zu kopierenden Felder
:type server: APplusServer
:param table: Tabelle für das neue DuplicateBusinessObjekt
:type table: str
:param cond: SQL-Bedingung zur Auswahl eines Objektes
:type cond: sql_utils.SqlCondition
:param cache: Cache, so dass benötigte Felder nicht immer wieder neu berechnet werden müssen
:type cache: Optional[FieldsToCopyForTableCache]
:param allowUpdate: ist Update statt Insert erlaubt?
:type allowUpdate: bool
:return: das neue DuplicateBusinessObject
:rtype: Optional[DuplicateBusinessObject]
"""
table = table.upper();
def getRow() -> pyodbc.Row:
sql = sql_utils.SqlStatementSelect(table)
sql.setTop(1)
sql.where.addCondition(cond);
return server.dbQuerySingleRow(sql)
return _loadDBDuplicateBusinessObjectDict(server, table, getRow(), cache=cache, allowUpdate=allowUpdate);
def loadDBDuplicateBusinessObjectSimpleCond(
server : APplusServer,
table : str,
field : str,
value : Optional[Union[sql_utils.SqlValue, bool]],
cache : Optional[FieldsToCopyForTableCache]=None,
allowUpdate : bool = False) -> Optional[DuplicateBusinessObject]:
"""
Wrapper für loadDBDuplicateBusinessObject, das eine einfache Bedingung benutzt,
bei der ein Feld einen bestimmten Wert haben muss.
:param server: Verbindung zum APP-Server, benutzt zum Nachschlagen der zu kopierenden Felder
:type server: APplusServer
:param table: Tabelle für das neue DuplicateBusinessObjekt
:type table: str
:param field: Feld für Bedingung
:type field: str
:param value: Wert des Feldes für Bedingung
:param cache: Cache, so dass benötigte Felder nicht immer wieder neu berechnet werden müssen
:type cache: Optional[FieldsToCopyForTableCache]
:return: das neue DuplicateBusinessObject
:rtype: Optional[DuplicateBusinessObject]
"""
cond = sql_utils.SqlConditionFieldEq(field, value)
return loadDBDuplicateBusinessObject(server, table, cond, cache=cache, allowUpdate=allowUpdate)
def loadDBDuplicateBusinessObjects(
server : APplusServer,
table : str,
cond : sql_utils.SqlCondition,
cache : Optional[FieldsToCopyForTableCache]=None,
allowUpdate : bool = False) -> Sequence[DuplicateBusinessObject]:
"""
Läd eine Liste von DuplicateBusinessObjekten aus der DB. Die Bedingung kann mehrere Datensätze auswählen.
:param server: Verbindung zum APP-Server, benutzt zum Nachschlagen der zu kopierenden Felder
:type server: APplusServer
:param table: Tabelle für das neue DuplicateBusinessObjekt
:type table: str
:param cond: SQL-Bedingung zur Auswahl eines Objektes
:type cond: sql_utils.SqlCondition
:param cache: Cache, so dass benötigte Felder nicht immer wieder neu berechnet werden müssen
:type cache: Optional[FieldsToCopyForTableCache]
:return: Liste der neuen DuplicateBusinessObjects
:rtype: Sequence[DuplicateBusinessObject]
"""
table = table.upper()
cache = initFieldsToCopyForTableCacheIfNeeded(server, cache)
def processRow(r : pyodbc.Row) -> Optional[DuplicateBusinessObject]:
return _loadDBDuplicateBusinessObjectDict(server, table, r, cache=cache, allowUpdate=allowUpdate)
sql = sql_utils.SqlStatementSelect(table)
sql.where.addCondition(cond)
return server.dbQueryAll(sql, apply=processRow)
def loadDBDuplicateBusinessObjectsSimpleCond(
server : APplusServer,
table : str,
field : str,
value : Optional[Union[sql_utils.SqlValue, bool]],
cache : Optional[FieldsToCopyForTableCache]=None,
allowUpdate : bool = False) -> Sequence[DuplicateBusinessObject]:
"""
Wrapper für loadDBDuplicateBusinessObjects, das eine einfache Bedingung benutzt,
bei der ein Feld einen bestimmten Wert haben muss.
:param server: Verbindung zum APP-Server, benutzt zum Nachschlagen der zu kopierenden Felder
:type server: APplusServer
:param table: Tabelle für das neue DuplicateBusinessObjekt
:type table: str
:param field: Feld für Bedingung
:param value: Wert des Feldes für Bedingung
:param cache: Cache, so dass benötigte Felder nicht immer wieder neu berechnet werden müssen
:type cache: Optional[FieldsToCopyForTableCache]
:return: Liste der neuen DuplicateBusinessObjects
:rtype: Sequence[DuplicateBusinessObject]
"""
cond = sql_utils.SqlConditionFieldEq(field, value)
return loadDBDuplicateBusinessObjects(server, table, cond, cache=cache, allowUpdate=allowUpdate)
# Im Laufe der Zeit sollten load-Funktionen für verschiedene BusinessObjekte
# erstellt werden. Dies erfolgt immer, wenn eine solche Funktion wirklich
# benutzt werden soll
def loadDBDuplicateAPlan(
server : APplusServer,
aplan : str,
cache:Optional[FieldsToCopyForTableCache]=None) -> Optional[DuplicateBusinessObject]:
"""
Erstelle DuplicateBusinessObject für einzelnen Arbeitsplan.
:param server: Verbindung zum APP-Server, benutzt zum Nachschlagen der zu kopierenden Felder
:type server: APplusServer
:param aplan: Aplan, der kopiert werden soll.
:type aplan: str
:param cache: Cache, so dass benötigte Felder nicht immer wieder neu berechnet werden müssen
:type cache: Optional[FieldsToCopyForTableCache]
:return: das neue DuplicateBusinessObject
:rtype: DuplicateBusinessObject
"""
cache = initFieldsToCopyForTableCacheIfNeeded(server, cache);
boMain = loadDBDuplicateBusinessObjectSimpleCond(server, "aplan", "APLAN", aplan, cache=cache)
if boMain is None:
return None
for so in loadDBDuplicateBusinessObjectsSimpleCond(server, "aplanpos", "APLAN", aplan, cache=cache):
boMain.addDependentBusinessObject(so, ("aplan", "aplan"))
return boMain
def loadDBDuplicateStueli(server : APplusServer, stueli : str, cache:Optional[FieldsToCopyForTableCache]=None) -> Optional[DuplicateBusinessObject]:
"""
Erstelle DuplicateBusinessObject für einzelne Stückliste.
:param server: Verbindung zum APP-Server, benutzt zum Nachschlagen der zu kopierenden Felder
:type server: APplusServer
:param stueli: Stückliste, die kopiert werden soll.
:type stueli: str
:param cache: Cache, so dass benötigte Felder nicht immer wieder neu berechnet werden müssen
:type cache: Optional[FieldsToCopyForTableCache]
:return: das neue DuplicateBusinessObject
:rtype: Optional[DuplicateBusinessObject]
"""
cache = initFieldsToCopyForTableCacheIfNeeded(server, cache);
boMain = loadDBDuplicateBusinessObjectSimpleCond(server, "stueli", "stueli", stueli, cache=cache)
if boMain is None:
return None
for so in loadDBDuplicateBusinessObjectsSimpleCond(server, "stuelipos", "stueli", stueli, cache=cache):
boMain.addDependentBusinessObject(so, ("stueli", "stueli"))
return boMain
def addSachgruppeDependentObjects(
do : DuplicateBusinessObject,
server : APplusServer,
cache:Optional[FieldsToCopyForTableCache]=None) -> None:
"""
Fügt Unterobjekte hinzu, die die Sachgruppenwerte kopieren.
:param do: zu erweiterndes DuplicateBusinessObject
:param server: Verbindung zum APP-Server, benutzt zum Nachschlagen der zu kopierenden Felder
:type server: APplusServer
:param cache: Cache, so dass benötigte Felder nicht immer wieder neu berechnet werden müssen
:type cache: Optional[FieldsToCopyForTableCache]
"""
cache = initFieldsToCopyForTableCacheIfNeeded(server, cache);
klasse = do.fields.get(sql_utils.normaliseDBfield("SACHGRUPPENKLASSE"), None)
if (klasse == None):
# keine Klasse gesetzt, nichts zu kopieren
return
# bestimme alle Gruppen
def loadGruppen() -> Sequence[str]:
sql = sql_utils.SqlStatementSelect("sachgruppenklassepos", "sachgruppe")
sql.where.addConditionFieldEq("sachgruppenklasse", klasse)
sql.where.addConditionFieldEq("tabelle", do.table)
return server.dbQueryAll(sql, apply=lambda r: r.sachgruppe)
gruppen = loadGruppen();
# Gruppe bearbeiten
def processGruppen() -> None:
cond = sql_utils.SqlConditionAnd()
cond.addConditionFieldEq("tabelle", do.table)
cond.addConditionFieldEq("instanzguid", do.getField("guid"))
cond.addConditionFieldEq("sachgruppenklasse", klasse)
cond.addConditionFieldIn("sachgruppe", gruppen)
cond.addConditionFieldStringNotEmpty("wert")
for so in loadDBDuplicateBusinessObjects(server, "sachwert", cond, cache=cache, allowUpdate=True):
do.addDependentBusinessObject(so, ("guid", "instanzguid"))
processGruppen()
def loadDBDuplicateArtikel(
server : APplusServer,
artikel : str,
cache:Optional[FieldsToCopyForTableCache]=None,
dupAplan:bool=True,
dupStueli:bool=True) -> Optional[DuplicateBusinessObject]:
"""
Erstelle DuplicateBusinessObject für einzelnen Artikel.
:param server: Verbindung zum APP-Server, benutzt zum Nachschlagen der zu kopierenden Felder
:type server: APplusServer
:param artikel: Artikel, der kopiert werden soll
:type artikel: str
:param cache: Cache, so dass benötigte Felder nicht immer wieder neu berechnet werden müssen
:type cache: Optional[FieldsToCopyForTableCache]
:param dupAplan: Arbeitsplan duplizieren?
:type dupAplan: bool optional
:param dupStueli: Stückliste duplizieren?
:type dupStueli: bool optional
:return: das neue DuplicateBusinessObject
:rtype: DuplicateBusinessObject
"""
cache = initFieldsToCopyForTableCacheIfNeeded(server, cache);
boArt = loadDBDuplicateBusinessObjectSimpleCond(server, "artikel", "ARTIKEL", artikel, cache=cache)
if boArt is None:
return None
addSachgruppeDependentObjects(boArt, server, cache=cache)
if dupAplan:
boAplan = loadDBDuplicateAPlan(server, artikel, cache=cache)
boArt.addDependentBusinessObject(boAplan, ("artikel", "aplan"))
if dupStueli:
boStueli = loadDBDuplicateStueli(server, artikel, cache=cache)
boArt.addDependentBusinessObject(boStueli, ("artikel", "stueli"))
return boArt

131
src/PyAPplus64/pandas.py Normal file
View File

@ -0,0 +1,131 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
"""Pandas Interface für PyAPplus64."""
from typing import Annotated as Ann
import pandas as pd # type: ignore
from pandas._typing import AggFuncType, FilePath, WriteExcelBuffer # type: ignore
import sqlalchemy
import traceback
from .applus import APplusServer
from .applus import sql_utils
from typing import *
def createSqlAlchemyEngine(server : APplusServer) -> sqlalchemy.Engine:
"""Erzeugt eine SqlAlchemy-Engine für die Verbindung zur DB."""
return sqlalchemy.create_engine(sqlalchemy.engine.URL.create("mssql+pyodbc", query={"odbc_connect": server.db_settings.getConnectionString()}))
def pandasReadSql(
server : APplusServer,
sql : sql_utils.SqlStatement,
raw:bool=False,
engine:Optional[sqlalchemy.Engine]=None) -> pd.DataFrame:
"""Wrapper für pd.read_sql für sqlalchemy-engine.
:param server: APplusServer für Datenbankverbindung und complete-SQL
:type server: APplusServer
:param sql: das SQL-statement
"""
if engine is None:
engine = createSqlAlchemyEngine(server);
with engine.connect() as conn:
return pd.read_sql(sqlalchemy.text(server.completeSQL(sql, raw=raw)), conn)
def _createHyperLinkGeneral(genOrg : Callable[[], str|int|float], genLink: Callable[[], str]) -> str|int|float:
"""
Hilfsfunktion zum Generieren eines Excel-Links.
:param genLink: Funktion, die Parameter aufgerufen wird und einen Link generiert
"""
org:str|int|float=""
org2:str|int|float
try:
org = genOrg();
if not org:
return org
else :
if isinstance(org, (int, float)):
org2 = org;
else:
org2 = "\"" + str(org).replace("\"", "\"\"") + "\""
return "=HYPERLINK(\"{}\", {})".format(genLink(), org2)
except:
msg = traceback.format_exc();
print ("Exception: {}".format(msg))
return org
def mkDataframeColumn(df : pd.DataFrame, makeValue : AggFuncType) -> pd.Series:
"""
Erzeugt für alle Zeilen eines Dataframes eine neuen Wert. Dies wird benutzt, um eine Spalte zu berechnen.
Diese kann eine Originalspalte ersetzen, oder neu hinzugefügt werden.
:param df: der Dataframe
:param makeValue: Funktion, die eine Zeile als Parameter bekommt und den neuen Wert berechnet
"""
def mkValueWrapper(r): # type: ignore
try:
return makeValue(r)
except:
msg = traceback.format_exc();
print ("Exception: {}".format(msg))
return ""
if (len(df.index) > 0):
return df.apply(mkValueWrapper, axis=1)
else:
return df.apply(lambda r: "", axis=1);
def mkHyperlinkDataframeColumn(df : pd.DataFrame, makeOrig : AggFuncType, makeLink : Callable[[Any], str]) -> pd.Series :
"""
Erzeugt für alle Zeilen eines Dataframes einen Hyperlink. Dies wird benutzt, um eine Spalte mit einem Hyperlink zu berechnen.
Diese kann eine Originalspalte ersetzen, oder neu hinzugefügt werden.
:param df: der Dataframe
:param makeOrig: Funktion, die eine Zeile als Parameter bekommt und den Wert berechnet, der angezeigt werden soll
:param makeLink: Funktion, die eine Zeile als Parameter bekommt und den Link berechnet
"""
if (len(df.index) > 0):
return df.apply(lambda r: _createHyperLinkGeneral(lambda : makeOrig(r), lambda : makeLink(r)), axis=1)
else:
return df.apply(lambda r: "", axis=1);
def exportToExcel(
filename:FilePath | WriteExcelBuffer | pd.ExcelWriter,
dfs : Sequence[Tuple[pd.DataFrame, str]],
addTable:bool=True) -> None:
"""
Schreibt eine Menge von Dataframes in eine Excel-Tabelle
:param filename: Name der Excel-Datei
:param dfs: Liste von Tupeln aus DataFrames und Namen von Sheets.
"""
with pd.ExcelWriter(filename, engine='xlsxwriter') as writer:
for (df, name) in dfs:
df.to_excel(writer, sheet_name=name, index=False, header=True)
ws = writer.sheets[name]
# Table
if addTable:
(max_row, max_col) = df.shape
if max_row > 0 and max_col > 0:
column_settings = [{'header': column} for column in df.columns]
ws.add_table(0, 0, max_row, max_col - 1, {'columns': column_settings})
# Spaltenbreiten anpassen
ws.autofit();

0
src/PyAPplus64/py.typed Normal file
View File

813
src/PyAPplus64/sql_utils.py Normal file
View File

@ -0,0 +1,813 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#-*- coding: utf-8 -*-
"""
Diese Datei enthält Funktionen für den Bau von SQL Statements, besonders
SELECT-Statements. Es gibt viel ausgefeiltere Methoden für die Erstellung von
SQL Statements. APplus benötigt jedoch die Statements als Strings, die dann an
APplus für Änderungen und erst danach an die DB geschickt werden. Dies erschwert
die Nutzung von Tools wie SqlAlchemy.
Hier werden einfache Hilfsfunktionen, die auf Strings basieren, zur Verfügung
gestellt. PyODBC erlaubt Parameter (dargestellt als '?') in SQL Statements, die
dann beim Aufruf gefüllt werden. Dies funktioniert auch im Zusammenspiel mit
APplus. Oft ist es sinnvoll, solche Parameter zu verwenden.
"""
from __future__ import annotations
import datetime
from typing import *
def normaliseDBfield(f : str) -> str:
"""Normalisiert die Darstellung eines DB-Feldes"""
return str(f).upper();
def normaliseDBfieldSet(s : Set[str]) -> Set[str]:
"""Normalisiert eine Menge von DB-Feldern"""
return {normaliseDBfield(f) for f in s}
def normaliseDBfieldList(l : Sequence[str]) -> Sequence[str]:
"""Normalisiert eine Menge von DB-Feldern"""
return [normaliseDBfield(f) for f in l]
class SqlField():
"""
Wrapper um SQL Feldnamen, die die Formatierung erleichtern
:param fn: der Feldname
:type fn: str
"""
def __init__(self, fn : str):
self.field = normaliseDBfield(fn);
def __str__(self) -> str:
return self.field;
class SqlFixed():
"""
Wrapper um Strings, die ohne Änderung in SQL übernommen werden
:param s: der string
:type s: str
"""
def __init__(self, s : str):
self.s = str(s);
def __str__(self) -> str:
return self.s;
class SqlDateTime():
"""
Wrapper um DateTime, die die Formatierung erleichtern
:param dt: der Zeitpunkt
:type dt: Union[datetime.datetime, datetime.date]
"""
def __init__(self, dt:Union[datetime.datetime, datetime.date]=datetime.datetime.now()) -> None:
self.value = dt;
def __str__(self) -> str:
# %f formatiert mit 6 Stellen, also microseconds. Es werden aber nur
# 3 Stellen unterstützt, daher werden 3 weggeworfen.
return self.value.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
class SqlDate():
"""
Wrapper um DateTime, die die Formatierung erleichtern
:param d: das Datum
:type d: Union[datetime.datetime, datetime.date]
"""
def __init__(self, d:Union[datetime.datetime, datetime.date]=datetime.datetime.now()) -> None:
self.value = d;
def __str__(self) -> str:
return self.value.strftime("%Y%m%d");
class SqlTime():
"""
Wrapper um DateTime, die die Formatierung erleichtern
:param t: die Zeit
:type t: Union[datetime.datetime, datetime.time]
"""
def __init__(self, t:Union[datetime.datetime, datetime.time]=datetime.datetime.now()) -> None:
self.value = t
def __str__(self) -> str:
return self.value.strftime("%H:%M:%S.%f")[:-3]
class SqlParam():
"""Hilfsklasse, für einen Parameter (?)"""
def __init__(self) -> None:
pass
def __str__(self) -> str:
return "?"
sqlParam = SqlParam()
"""Da SqlParam keinen Zustand hat, reicht ein einzelner statischer Wert"""
def formatSqlValueString(s:str) -> str:
"""
Formatiert einen String für ein Sql-Statement. Der String wird in "'" eingeschlossen
und Hochkomma im Text maskiert.
:param s: der String
:type s: str
:return: der formatierte String
:rtype: str
"""
if (s is None):
return "''";
return "'" + str(s).replace("'", "''") + "'";
SqlValue : TypeAlias = Union[str, int, float, SqlParam, SqlField, SqlFixed, SqlDate, SqlDateTime, datetime.datetime, datetime.date, datetime.time]
"""Union-Type aller unterstützter SQL-Werte"""
def formatSqlValue(v : SqlValue) -> str:
"""
Formatiert einen Wert für SQL. Je nachdem um welchen Typ es sich handelt, werden andere Formatierungen verwendet.
:param v: der Wert
:type v: SqlValue
:return: der formatierte Wert
:rtype: str
"""
if (v == None):
raise Exception("formatSqlValue: null not supported");
if isinstance(v, (int, float, SqlField)):
return str(v);
elif isinstance(v, str):
return formatSqlValueString(v);
elif isinstance(v, datetime.datetime):
return "'" + str(SqlDateTime(v)) + "'";
elif isinstance(v, datetime.date):
return "'" + str(SqlDate(v)) + "'";
elif isinstance(v, datetime.time):
return "'" + str(SqlTime(v)) + "'";
elif isinstance(v, (SqlDateTime, SqlDate, SqlTime)):
return "'" + str(v) + "'";
elif isinstance(v, (SqlParam, SqlFixed)):
return str(v);
else:
raise Exception("formatSqlValue: unsupported type {}".format(type(v)));
class SqlCondition():
"""Eine abstrakte Sql-Bedingung. Unterklassen erledigen die eigentliche Arbeit."""
def getCondition(self) -> str:
"""
Liefert die Bedingung als String
:return: die Bedingung
:rtype: str
"""
raise Exception("Not implemented");
def __str__(self) -> str:
return self.getCondition();
class SqlConditionPrepared(SqlCondition):
"""Eine einfache Sql-Bedingung, die immer einen festen String zurückgibt."""
def __init__(self, cond : Union[SqlCondition, str]):
self.cond = str(cond);
def getCondition(self) -> str:
return self.cond;
class SqlConditionTrue(SqlConditionPrepared):
"""True-Bedingung"""
def __init__(self) -> None:
super().__init__("(1=1)")
class SqlConditionFalse(SqlConditionPrepared):
"""False-Bedingung"""
def __init__(self) -> None:
super().__init__("(1=0)")
class SqlConditionBool(SqlConditionPrepared):
"""Fixe True-oder-False Bedingung"""
def __init__(self, b : bool):
if b:
super().__init__(SqlConditionTrue())
else:
super().__init__(SqlConditionFalse())
class SqlConditionNot(SqlCondition):
"""
Negation einer anderen Bedingung
:param cond: die zu negierende Bedingung
:type cond: SqlCondition
"""
def __init__(self, cond : SqlCondition):
self.cond = cond;
def getCondition(self) -> str:
return "(not {})".format(self.cond.getCondition());
class SqlConditionIsNull(SqlConditionPrepared):
"""
Wert soll null sein
:param v: das Feld
:type v: SqlValue
"""
def __init__(self, v : SqlValue):
super().__init__("({} is null)".format(formatSqlValue(v)))
class SqlConditionFieldIsNull(SqlConditionIsNull):
def __init__(self, field : str):
super().__init__(SqlField(field))
class SqlConditionIsNotNull(SqlConditionPrepared):
"""
Wert soll nicht null sein
:param v: der Wert
:type v: SqlValue
"""
def __init__(self, v : SqlValue):
super().__init__("({} is not null)".format(formatSqlValue(v)))
class SqlConditionFieldIsNotNull(SqlConditionIsNotNull):
def __init__(self, field : str):
super().__init__(SqlField(field))
class SqlConditionStringStartsWith(SqlConditionPrepared):
"""
Feld soll mit einem bestimmten String beginnen
:param field: das Feld
:type field: str
:param value: der Wert
:type value: str
"""
def __init__(self, field : str, value : str):
cond = "";
if value:
cond="(left({}, {}) = {})".format(normaliseDBfield(field), len(value), formatSqlValueString(value));
else:
cond = "(1=1)"
super().__init__(cond)
class SqlConditionFieldStringNotEmpty(SqlConditionPrepared):
"""
Feld soll nicht den leeren String oder null enthalten.
Der Ausdruck wird wörtlich übernommen.
:param field: das Feld
:type field: str
"""
def __init__(self, field : str):
field = normaliseDBfield(field);
cond="({} is not null and {} != '')".format(field, field);
super().__init__(cond)
class SqlConditionIn(SqlConditionPrepared):
"""
Bedingung der Form 'v in ...'
:param value: der Wert, kann unterschiedliche Typen besitzen
:type value: SqlValue
:param values: die erlaubten Werte
:type values: Sequence[SqlValue]
"""
def __init__(self, value : SqlValue, values : Sequence[SqlValue]):
valuesLen = len(values)
if (valuesLen == 0):
cond : Union[SqlCondition, str] = SqlConditionFalse()
elif (valuesLen == 1):
cond = SqlConditionEq(value, values[0])
else:
valuesS = formatSqlValue(values[0])
for i in range(1, valuesLen):
valuesS += ", " + formatSqlValue(values[i])
cond = "({} in ({}))".format(formatSqlValue(value), valuesS)
super().__init__(cond)
class SqlConditionFieldIn(SqlConditionIn):
def __init__(self, field:str, values : Sequence[SqlValue]):
super().__init__(SqlField(field), values)
class SqlConditionEq(SqlConditionPrepared):
"""
Bedingung der Form 'v1 is null', 'v2 is null', 'v1 = v2', '(1=1)' oder '(0=1)'
:param value1: der Wert, kann unterschiedliche Typen besitzen
:param value2: der Wert, kann unterschiedliche Typen besitzen
"""
def __init__(self, value1 : Optional[Union[SqlValue, bool]], value2 : Optional[Union[SqlValue, bool]]):
cond: Union[SqlCondition, str]
if (value1 is None) and (value2 is None):
cond = SqlConditionTrue()
elif (value1 is None) and not (value2 is None):
if (isinstance(value2, bool)):
cond = SqlConditionFalse()
else:
cond = SqlConditionIsNull(value2)
elif not (value1 is None) and (value2 is None):
if (isinstance(value1, bool)):
cond = SqlConditionFalse()
else:
cond = SqlConditionIsNull(value1)
else:
if isinstance(value1, bool) and isinstance(value2, bool):
cond = SqlConditionBool(value1 == value2);
elif isinstance(value1, bool) and not isinstance(value2, bool):
value2 = cast(SqlValue, value2)
if value1:
cond = "({} = 1)".format(formatSqlValue(value2))
else:
cond = "({} = 0 OR {} is null)".format(formatSqlValue(value2), formatSqlValue(value2));
elif not isinstance(value1, bool) and isinstance(value2, bool):
value1 = cast(SqlValue, value1)
if value2:
cond = "({} = 1)".format(formatSqlValue(value1))
else:
cond = "({} = 0 OR {} is null)".format(formatSqlValue(value1), formatSqlValue(value1))
else:
value1 = cast(SqlValue, value1)
value2 = cast(SqlValue, value2)
cond = "({} = {})".format(formatSqlValue(value1), formatSqlValue(value2));
super().__init__(cond)
class SqlConditionBinComp(SqlConditionPrepared):
"""
Bedingung der Form 'value1 op value2'
:param op: der Vergleichsoperator
:type op: str
:param value1: der Wert, kann unterschiedliche Typen besitzen
:type value1: SqlValue
:param value2: der Wert, kann unterschiedliche Typen besitzen
:type value2: SqlValue
"""
def __init__(self, op : str, value1 : SqlValue, value2 : SqlValue):
if not(value1) or not(value2):
raise Exception("SqlConditionBinComp: value not provided")
cond = "({} {} {})".format(formatSqlValue(value1), op, formatSqlValue(value2));
super().__init__(cond)
class SqlConditionLt(SqlConditionBinComp):
"""
Bedingung der Form 'value1 < value2'
:param value1: der Wert, kann unterschiedliche Typen besitzen
:param value2: der Wert, kann unterschiedliche Typen besitzen
"""
def __init__(self, value1 : SqlValue, value2 : SqlValue):
super().__init__("<", value1, value2)
class SqlConditionLe(SqlConditionBinComp):
"""
Bedingung der Form 'value1 <= value2'
:param value1: der Wert, kann unterschiedliche Typen besitzen
:param value2: der Wert, kann unterschiedliche Typen besitzen
"""
def __init__(self, value1 : SqlValue, value2 : SqlValue):
super().__init__("<=", value1, value2)
class SqlConditionGt(SqlConditionBinComp):
"""
Bedingung der Form 'value1 > value2'
:param value1: der Wert, kann unterschiedliche Typen besitzen
:param value2: der Wert, kann unterschiedliche Typen besitzen
"""
def __init__(self, value1 : SqlValue, value2 : SqlValue):
super().__init__(">", value1, value2)
class SqlConditionGe(SqlConditionBinComp):
"""
Bedingung der Form 'value1 >= value2'
:param value1: der Wert, kann unterschiedliche Typen besitzen
:param value2: der Wert, kann unterschiedliche Typen besitzen
"""
def __init__(self, value1 : SqlValue, value2 : SqlValue):
super().__init__(">=", value1, value2)
class SqlConditionFieldEq(SqlConditionEq):
def __init__(self, field : str, value : Optional[Union[SqlValue, bool]]):
super().__init__(SqlField(field), value)
class SqlConditionFieldLt(SqlConditionLt):
def __init__(self, field : str, value : SqlValue):
super().__init__(SqlField(field), value)
class SqlConditionFieldLe(SqlConditionLe):
def __init__(self, field : str, value : SqlValue):
super().__init__(SqlField(field), value)
class SqlConditionFieldGt(SqlConditionGt):
def __init__(self, field : str, value : SqlValue):
super().__init__(SqlField(field), value)
class SqlConditionFieldGe(SqlConditionGe):
def __init__(self, field : str, value : SqlValue):
super().__init__(SqlField(field), value)
class SqlConditionList(SqlCondition):
"""
Eine SQL Bedingung, die sich aus einer Liste anderer Bedingungen zusammensetzen.
Dies kann eine "AND" oder eine "OR" Liste sein.
:param connector: wie werden Listenelemente verbunden (AND oder OR)
:type connector: str
:param emptyCond: Rückgabewert für leere Liste
:type emptyCond: str
"""
def __init__(self, connector : str, emptyCond : str):
self.connector : str = connector;
self.emptyCond : str = emptyCond;
self.elems : List[SqlCondition] = []
def addCondition(self, cond : SqlCondition | str | None) -> None:
if (cond is None):
return
if not (isinstance(cond, SqlCondition)):
cond = SqlConditionPrepared("("+str(cond)+")");
self.elems.append(cond);
def addConditions(self, *conds : SqlCondition | str | None) -> None:
for cond in conds:
self.addCondition(cond)
def addConditionFieldStringNotEmpty(self, field : str) -> None:
self.addCondition(SqlConditionFieldStringNotEmpty(field));
def addConditionFieldIn(self, field : str, values : Sequence[SqlValue]) -> None:
self.addCondition(SqlConditionFieldIn(field, values));
def addConditionFieldEq(self, field : str, value : Optional[Union[SqlValue, bool]]) -> None:
self.addCondition(SqlConditionFieldEq(field, value));
def addConditionFieldsEq(self, field1 : str, field2 : str) -> None:
self.addCondition(SqlConditionEq(SqlField(field1), SqlField(field2)));
def addConditionEq(self, value1 : Optional[Union[SqlValue, bool]], value2 : Optional[Union[SqlValue, bool]]) -> None:
self.addCondition(SqlConditionEq(value1, value2));
def addConditionGe(self, value1 : SqlValue, value2 : SqlValue) -> None:
self.addCondition(SqlConditionGe(value1, value2));
def addConditionFieldGe(self, field : str, value : SqlValue) -> None:
self.addCondition(SqlConditionGe(SqlField(field), value));
def addConditionFieldsGe(self, field1 : str, field2 : str) -> None:
self.addCondition(SqlConditionGe(SqlField(field1), SqlField(field2)));
def addConditionLe(self, value1 : SqlValue, value2 : SqlValue) -> None:
self.addCondition(SqlConditionLe(value1, value2));
def addConditionFieldLe(self, field : str, value : SqlValue) -> None:
self.addCondition(SqlConditionLe(SqlField(field), value));
def addConditionFieldsLe(self, field1 : str, field2 : str) -> None:
self.addCondition(SqlConditionLe(SqlField(field1), SqlField(field2)));
def addConditionGt(self, value1 : SqlValue, value2 : SqlValue) -> None:
self.addCondition(SqlConditionGt(value1, value2));
def addConditionFieldGt(self, field : str, value : SqlValue) -> None:
self.addCondition(SqlConditionGt(SqlField(field), value));
def addConditionFieldsGt(self, field1 : str, field2 : str) -> None:
self.addCondition(SqlConditionGt(SqlField(field1), SqlField(field2)));
def addConditionLt(self, value1 : SqlValue, value2 : SqlValue) -> None:
self.addCondition(SqlConditionLt(value1, value2));
def addConditionFieldLt(self, field : str, value : SqlValue) -> None:
self.addCondition(SqlConditionLt(SqlField(field), value));
def addConditionFieldsLt(self, field1 : str, field2 : str) -> None:
self.addCondition(SqlConditionLt(SqlField(field1), SqlField(field2)));
def addConditionFieldIsNull(self, field : str) -> None:
self.addCondition(SqlConditionFieldIsNull(field));
def addConditionFieldIsNotNull(self, field : str) -> None:
self.addCondition(SqlConditionFieldIsNotNull(field));
def isEmpty(self) -> bool:
return not(self.elems);
def getCondition(self) -> str:
match (len(self.elems)):
case 0:
return self.emptyCond;
case 1:
return self.elems[0].getCondition();
case l:
res = "(" + self.elems[0].getCondition();
for i in range(1, l):
res += " {} {}".format(self.connector, self.elems[i].getCondition())
res += ")";
return res;
class SqlConditionDateTimeFieldInRange(SqlConditionPrepared):
"""
Liegt Datetime in einem bestimmten Zeitraum?
:param field: das Feld
:type field: str
:param datetimeVon: der untere Wert (einschließlich), None erlaubt beliebige Zeiten
:param datetimeBis: der obere Wert (ausschließlich), None erlaubt beliebige Zeiten
"""
def __init__(self, field : str, datetimeVon : datetime.datetime|None, datetimeBis : datetime.datetime|None):
cond = SqlConditionAnd()
if not (datetimeVon is None):
cond.addConditionFieldGe(field, datetimeVon)
if not (datetimeBis is None):
cond.addConditionFieldLt(field, datetimeBis)
super().__init__(str(cond))
class SqlConditionDateTimeFieldInMonth(SqlConditionPrepared):
"""
Liegt Datetime in einem bestimmten Monat?
:param field: das Feld
:type field: string
:param year: das Jahr
:param month: der Monat
"""
def __init__(self, field : str, year : int, month : int):
if month == 12:
nyear=year+1
nmonth=1;
else:
nyear=year
nmonth=month+1;
cond = SqlConditionDateTimeFieldInRange(field,
datetime.datetime(year=year, month=month, day=1),
datetime.datetime(year=nyear, month=nmonth, day=1))
super().__init__(str(cond))
class SqlConditionDateTimeFieldInYear(SqlConditionPrepared):
"""
Liegt Datetime in einem bestimmten Jahr?
:param field: das Feld
:type field: str
:param year: das Jahr
"""
def __init__(self, field : str, year :int) -> None:
nyear=year+1
cond = SqlConditionDateTimeFieldInRange(field,
datetime.datetime(year=year, month=1, day=1),
datetime.datetime(year=nyear, month=1, day=1))
super().__init__(str(cond))
class SqlConditionDateTimeFieldInDay(SqlConditionPrepared):
"""
Liegt Datetime in einem bestimmten Monat?
:param field: das Feld
:type field: str
:param year: das Jahr
:param month: der Monat
:param day: der Tag
"""
def __init__(self, field : str, year : int, month : int, day : int) -> None:
d = datetime.datetime(year=year, month=month, day=day)
cond = SqlConditionDateTimeFieldInRange(field,
d,
d + datetime.timedelta(days=1))
super().__init__(str(cond))
class SqlConditionAnd(SqlConditionList):
def __init__(self, *conds : Union[SqlCondition, str]) -> None:
super().__init__("AND", "(1=1)")
self.addConditions(*conds)
class SqlConditionOr(SqlConditionList):
def __init__(self, *conds : Union[SqlCondition, str]) -> None:
super().__init__("OR", "(1=0)")
self.addConditions(*conds)
class SqlJoin():
"""
Ein abstrakter Sql-Join
:param joinType: Art des Joins, wird in SQL übernommen, z.B. "LEFT JOIN".
:type joinType: str
:param table: die Tabelle, die gejoint werden soll
:type table: str
:param conds: Bedingungen, die bereits hinzugefügt werden soll. Weitere können über Attribut `on` hinzugefügt werden.
"""
def __init__(self, joinType : str, table : str, *conds : Union[SqlCondition, str]) -> None:
self.joinType = joinType
self.table = table
self.on : SqlConditionAnd = SqlConditionAnd(*conds)
"""Bedingung des Joins, kann noch nachträglich erweitert werden"""
def getJoin(self) -> str:
"""
Liefert den Join als String
"""
return self.joinType + " " + self.table + " ON " + self.on.getCondition();
def __str__(self) -> str:
return self.getJoin();
class SqlInnerJoin(SqlJoin):
"""
Ein Inner-Join.
:param table: die Tabelle, die gejoint werden soll
:type table: str
:param conds: Bedingungen, die bereits hinzugefügt werden soll. Weitere können über Attribut `on` hinzugefügt werden.
"""
def __init__(self, table : str, *conds : Union[SqlCondition, str]) -> None:
super().__init__("INNER JOIN", table, *conds)
class SqlLeftJoin(SqlJoin):
"""
Ein Left-Join.
:param table: die Tabelle, die gejoint werden soll
:type table: str
:param conds: Bedingungen, die bereits hinzugefügt werden soll. Weitere können über Attribut `on` hinzugefügt werden.
"""
def __init__(self, table : str, *conds : Union[SqlCondition, str]) -> None:
super().__init__("LEFT JOIN", table, *conds)
class SqlStatementSelect():
"""
Klasse, um einfache Select-Statements zu bauen.
:param table: die Haupt-Tabelle
:type table: str
:param fields: kein oder mehrere Felder, die selektiert werden sollen
"""
def __init__(self, table : str, *fields : str) -> None:
self.table : str = table
"""die Tabelle"""
self.top : int = 0
"""wie viele Datensätze auswählen? 0 für alle"""
self.where : SqlConditionList = SqlConditionAnd();
"""die Bedingung, Default ist True"""
self.fields : List[str] = []
"""Liste von auszuwählenden Feldern"""
self.addFields(*fields)
self.joins : List[SqlJoin|str] = []
"""Joins mit extra Tabellen"""
self.groupBy : List[str] = [];
"""die Bedingung, Default ist True"""
self.having : SqlConditionList = SqlConditionAnd();
"""die Bedingung having, Default ist True"""
self.order : Optional[str] = None
"""Sortierung"""
def __str__(self) -> str:
return self.getSql();
def addFields(self, *fields : str) -> None:
"""Fügt ein oder mehrere Felder, also auszuwählende Werte zu einem SQL-Statement hinzu."""
for f in fields:
if not (f == None):
self.fields.append(f)
def addGroupBy(self, *fields : str) -> None:
"""Fügt ein oder mehrere GroupBy Felder zu einem SQL-Statement hinzu."""
for f in fields:
if not (f == None):
self.groupBy.append(f)
def setTop(self, t : int) -> None:
"""Wie viele Datensätze sollen maximal zurückgeliefert werden? 0 für alle"""
self.top = t
def addFieldsTable(self, table : str, *fields : str) -> None:
"""
Fügt ein oder mehrere Felder, die zu einer Tabelle gehören zu einem SQL-Statement hinzu.
Felder sind Strings. Vor jeden dieser Strings wird die Tabelle mit einem Punkt getrennt gesetzt.
Dies kann im Vergleich zu 'addFields' Schreibarbeit erleitern.
"""
for f in fields:
if not (f == None):
self.fields.append(table + "." + str(f))
def addJoin(self, j : SqlJoin|str) -> None:
"""Fügt ein Join zum SQL-Statement hinzu. Beispiel: 'LEFT JOIN personal p ON t.UPDUSER = p.PERSONAL'"""
self.joins.append(j)
def addLeftJoin(self, table : str, *conds : Union[SqlCondition, str]) -> SqlLeftJoin:
j = SqlLeftJoin(table, *conds)
self.addJoin(j)
return j
def addInnerJoin(self, table : str, *conds : Union[SqlCondition, str]) -> SqlInnerJoin:
j = SqlInnerJoin(table, *conds)
self.addJoin(j)
return j
def getSql(self) -> str:
"""Liefert das SQL-SELECT-Statement als String"""
def getFields() -> str:
match (len(self.fields)):
case 0:
return "*";
case 1:
return str(self.fields[0]);
case l:
res = str(self.fields[0]);
for i in range(1, l):
res += ", " + str(self.fields[i]);
return res;
def getGroupBy() -> str:
match (len(self.groupBy)):
case 0:
return "";
case l:
res = " GROUP BY " + str(self.fields[0])
for i in range(1, l):
res += ", " + str(self.fields[i]);
if not (self.having.isEmpty()):
res += " HAVING " + str(self.having)
return res;
def getJoins() -> str:
match (len(self.joins)):
case 0:
return ""
case l:
res = "";
for i in range(0, l):
res += " " + str(self.joins[i])
return res
def getWhere() -> str:
if self.where.isEmpty():
return ""
else:
return " WHERE " + str(self.where)
def getOrder() -> str:
if self.order == None:
return ""
else:
return " ORDER BY " + str(self.order)
def getTop() -> str:
if self.top <= 0:
return ""
else:
return "TOP " + str(self.top) + " "
return "SELECT " + getTop() + getFields() + " FROM " + self.table + getJoins() + getWhere() + getGroupBy() + getOrder();
SqlStatement : TypeAlias = Union [SqlStatementSelect, str]

56
src/PyAPplus64/utils.py Normal file
View File

@ -0,0 +1,56 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
#-*- coding: utf-8 -*-
import pathlib
import datetime
from typing import *
def checkDirExists(dir : Union[str, pathlib.Path]) -> pathlib.Path:
"""Prüft, ob ein Verzeichnis existiert. Ist dies nicht möglich, wird eine Exception geworfen.
:param dir: das Verzeichnis
:type dir: Union[str, pathlib.Path]
:return: den normalisierten Pfad
:rtype: pathlib.Path
"""
if not (isinstance(dir, pathlib.Path)):
dir = pathlib.Path(str(dir))
dir = dir.resolve()
if not (dir.exists()):
raise Exception("Verzeichnis '" + str(dir) + "' nicht gefunden");
if not (dir.is_dir()):
raise Exception("'" + str(dir) + "' ist kein Verzeichnis");
return dir;
def formatDateTimeForAPplus(v : Union[datetime.datetime, datetime.date, datetime.time]) -> str:
"""Formatiert ein Datum oder eine Uhrzeit für APplus"""
if (v == None):
return "";
elif isinstance(v, str):
return v;
elif isinstance(v, datetime.datetime):
return v.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
elif isinstance(v, datetime.date):
return v.strftime("%Y-%m-%d")
elif isinstance(v, datetime.time):
return v.strftime("%H:%M:%S.%f")[:-3]
else:
return str(v)
def containsOnlyAllowedChars(charset : Set[str], s : str) -> bool:
"""Enthält ein String nur erlaubte Zeichen?"""
for c in s:
if not (c in charset):
return False
return True

23
tests/test_applus_db.py Normal file
View File

@ -0,0 +1,23 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
from PyAPplus64 import applus_db
import datetime
def test_DBTableIDs1() -> None:
ids = applus_db.DBTableIDs();
assert (str(ids) == "{}")
ids.add("t1", 1)
assert (str(ids) == "{'T1': {1}}")
ids.add("t1", 2,3,4)
assert (str(ids) == "{'T1': {1, 2, 3, 4}}")
assert (ids.getTable("T1") == {1, 2, 3, 4})
assert (ids.getTable("T2") == set())
ids.add("t2", 2,3,4)
assert (ids.getTable("T2") == {2,3,4})
assert (str(ids) == "{'T1': {1, 2, 3, 4}, 'T2': {2, 3, 4}}")

351
tests/test_sql_utils.py Normal file
View File

@ -0,0 +1,351 @@
# Copyright (c) 2023 Thomas Tuerk (kontakt@thomas-tuerk.de)
#
# This file is part of PyAPplus64 (see https://www.thomas-tuerk.de/de/pyapplus64).
#
# Use of this source code is governed by an MIT-style
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
from PyAPplus64 import sql_utils
import datetime
def test_normaliseDBField1() -> None:
assert (sql_utils.normaliseDBfield("aAa") == "AAA")
assert (sql_utils.normaliseDBfield("a#Aa") == "A#AA")
assert (sql_utils.normaliseDBfield("2") == "2")
def test_normaliseDBFieldSet() -> None:
assert (sql_utils.normaliseDBfieldSet(set()) == set())
assert (sql_utils.normaliseDBfieldSet({"aAa", "b", "c", "2"}) == {"2", "AAA", "B", "C"})
def test_normaliseDBFieldList() -> None:
assert (sql_utils.normaliseDBfieldList([]) == [])
assert (sql_utils.normaliseDBfieldList(["aAa", "b", "c", "2"]) == ["AAA", "B", "C", "2"])
def test_SqlField1() -> None:
assert (str(sql_utils.SqlField("abc")) == "ABC")
def test_SqlField2() -> None:
assert (str(sql_utils.SqlField("t.abc")) == "T.ABC")
def test_SqlParam() -> None:
assert (str(sql_utils.sqlParam) == "?")
def test_SqlDateTime() -> None:
dt = datetime.datetime(year=2023, month=1, day=12, hour=9, minute=59, second=12, microsecond=2344)
assert (str(sql_utils.SqlDateTime(dt)) == "2023-01-12T09:59:12.002")
def test_SqlDate() -> None:
dt = datetime.datetime(year=2023, month=1, day=12, hour=9, minute=59, second=12, microsecond=2344)
assert (str(sql_utils.SqlDate(dt)) == "20230112")
def test_formatSqlValueString1() -> None:
assert(sql_utils.formatSqlValueString("") == "''");
def test_formatSqlValueString2() -> None:
assert(sql_utils.formatSqlValueString("abc") == "'abc'");
def test_formatSqlValueString3() -> None:
assert(sql_utils.formatSqlValueString("a b c") == "'a b c'");
def test_formatSqlValueString4() -> None:
assert(sql_utils.formatSqlValueString("a \"b\" c") == "'a \"b\" c'");
def test_formatSqlValueString5() -> None:
assert(sql_utils.formatSqlValueString("a 'b'\nc") == "'a ''b''\nc'");
def test_formatSqlValue1() -> None:
assert(sql_utils.formatSqlValue(2) == "2");
def test_formatSqlValue2() -> None:
assert(sql_utils.formatSqlValue(2.4) == "2.4");
def test_formatSqlValue3() -> None:
assert(sql_utils.formatSqlValue("AA") == "'AA'");
def test_formatSqlValue4() -> None:
assert(sql_utils.formatSqlValue(sql_utils.SqlField("aa")) == "AA");
def test_formatSqlValue5() -> None:
assert(sql_utils.formatSqlValue(0) == "0");
def test_formatSqlValue6() -> None:
dt = datetime.datetime(year=2023, month=1, day=12, hour=9, minute=59, second=12, microsecond=2344)
assert(sql_utils.formatSqlValue(sql_utils.SqlDateTime(dt)) == "'2023-01-12T09:59:12.002'");
def test_SqlConditionTrue() -> None:
assert(str(sql_utils.SqlConditionTrue()) == "(1=1)");
def test_SqlConditionFalse() -> None:
assert(str(sql_utils.SqlConditionFalse()) == "(1=0)");
def test_SqlConditionBool1() -> None:
assert(str(sql_utils.SqlConditionBool(True)) == "(1=1)");
def test_SqlConditionBool2() -> None:
assert(str(sql_utils.SqlConditionBool(False)) == "(1=0)");
def test_SqlConditionIsNull() -> None:
cond = sql_utils.SqlConditionIsNull("AA");
assert(str(cond) == "('AA' is null)");
def test_SqlConditionIsNotNull() -> None:
cond = sql_utils.SqlConditionIsNotNull("AA");
assert(str(cond) == "('AA' is not null)");
def test_SqlConditionNot() -> None:
cond1 = sql_utils.SqlConditionIsNull("AA");
cond = sql_utils.SqlConditionNot(cond1);
assert(str(cond) == "(not ('AA' is null))");
def test_SqlConditionStringStartsWith() -> None:
cond = sql_utils.SqlConditionStringStartsWith("f", "a'an")
assert(str(cond) == "(left(F, 4) = 'a''an')");
def test_SqlConditionIn1() -> None:
cond = sql_utils.SqlConditionIn(sql_utils.SqlField("f"), [])
assert(str(cond) == "(1=0)");
def test_SqlConditionIn2() -> None:
cond = sql_utils.SqlConditionIn(sql_utils.SqlField("f"), ["a"])
assert(str(cond) == "(F = 'a')");
def test_SqlConditionIn3() -> None:
cond = sql_utils.SqlConditionIn(sql_utils.SqlField("f"), ["a", "a'A", "b", "c"])
assert(str(cond) == "(F in ('a', 'a''A', 'b', 'c'))");
def test_SqlConditionStringNotEmpty1() -> None:
cond = sql_utils.SqlConditionFieldStringNotEmpty("f")
assert(str(cond) == "(F is not null and F != '')");
def test_SqlConditionEq1() -> None:
cond = sql_utils.SqlConditionEq("f1", None)
assert(str(cond) == "('f1' is null)");
def test_SqlConditionEq2() -> None:
cond = sql_utils.SqlConditionEq(None, "f1")
assert(str(cond) == "('f1' is null)");
def test_SqlConditionEq3() -> None:
cond = sql_utils.SqlConditionEq(sql_utils.SqlField("f1"), sql_utils.SqlField("f2"))
assert(str(cond) == "(F1 = F2)");
def test_SqlConditionEq4() -> None:
cond = sql_utils.SqlConditionEq(sql_utils.SqlField("f1"), "aa'a")
assert(str(cond) == "(F1 = 'aa''a')");
def test_SqlConditionEq5() -> None:
cond = sql_utils.SqlConditionEq(sql_utils.SqlField("f1"), 2)
assert(str(cond) == "(F1 = 2)");
def test_SqlConditionEq6() -> None:
cond = sql_utils.SqlConditionEq(sql_utils.SqlField("f1"), True)
assert(str(cond) == "(F1 = 1)");
def test_SqlConditionEq7() -> None:
cond = sql_utils.SqlConditionEq(sql_utils.SqlField("f1"), False)
assert(str(cond) == "(F1 = 0 OR F1 is null)");
def test_SqlConditionEq8() -> None:
cond = sql_utils.SqlConditionEq(True, sql_utils.SqlField("f1"))
assert(str(cond) == "(F1 = 1)");
def test_SqlConditionEq9() -> None:
cond = sql_utils.SqlConditionEq(False, sql_utils.SqlField("f1"))
assert(str(cond) == "(F1 = 0 OR F1 is null)");
def test_SqlConditionEq10() -> None:
cond = sql_utils.SqlConditionEq(False, True)
assert(str(cond) == "(1=0)");
def test_SqlConditionEq11() -> None:
cond = sql_utils.SqlConditionEq(True, True)
assert(str(cond) == "(1=1)");
def test_SqlConditionFieldEq1() -> None:
cond = sql_utils.SqlConditionFieldEq("f1", None)
assert(str(cond) == "(F1 is null)");
def test_SqlConditionFieldEq2() -> None:
cond = sql_utils.SqlConditionFieldEq("f1", sql_utils.SqlField("f2"))
assert(str(cond) == "(F1 = F2)");
def test_SqlConditionFieldEq3() -> None:
cond = sql_utils.SqlConditionFieldEq("f1", "aa'a")
assert(str(cond) == "(F1 = 'aa''a')");
def test_SqlConditionFieldEq4() -> None:
cond = sql_utils.SqlConditionFieldEq("f1", 2)
assert(str(cond) == "(F1 = 2)");
def test_SqlConditionFieldEq5() -> None:
cond = sql_utils.SqlConditionFieldEq("f1", sql_utils.sqlParam)
assert(str(cond) == "(F1 = ?)");
def test_SqlConditionLt1() -> None:
cond = sql_utils.SqlConditionLt(sql_utils.SqlField("f"), sql_utils.SqlDate(datetime.date(year=2022, month=12, day=12)))
assert(str(cond) == "(F < '20221212')");
def test_SqlConditionLt2() -> None:
cond = sql_utils.SqlConditionLt(2, sql_utils.SqlField("f"))
assert(str(cond) == "(2 < F)");
def test_SqlConditionGt1() -> None:
cond = sql_utils.SqlConditionGt(sql_utils.SqlField("f"), sql_utils.SqlDate(datetime.date(year=2022, month=12, day=12)))
assert(str(cond) == "(F > '20221212')");
def test_SqlConditionGt2() -> None:
cond = sql_utils.SqlConditionGt(2, sql_utils.SqlField("f"))
assert(str(cond) == "(2 > F)");
def test_SqlConditionLe1() -> None:
cond = sql_utils.SqlConditionLe(sql_utils.SqlField("f"), sql_utils.SqlDate(datetime.date(year=2022, month=12, day=12)))
assert(str(cond) == "(F <= '20221212')");
def test_SqlConditionLe2() -> None:
cond = sql_utils.SqlConditionLe(2, sql_utils.SqlField("f"))
assert(str(cond) == "(2 <= F)");
def test_SqlConditionGe1() -> None:
cond = sql_utils.SqlConditionGe(sql_utils.SqlField("f"), sql_utils.SqlDate(datetime.date(year=2022, month=12, day=12)))
assert(str(cond) == "(F >= '20221212')");
def test_SqlConditionGe2() -> None:
cond = sql_utils.SqlConditionGe(2, sql_utils.SqlField("f"))
assert(str(cond) == "(2 >= F)");
def test_SqlConditionFieldLt1() -> None:
cond = sql_utils.SqlConditionFieldLt("f", sql_utils.SqlDate(datetime.date(year=2022, month=12, day=12)))
assert(str(cond) == "(F < '20221212')");
def test_SqlConditionFieldLe1() -> None:
cond = sql_utils.SqlConditionFieldLe("f", sql_utils.SqlDate(datetime.date(year=2022, month=12, day=12)))
assert(str(cond) == "(F <= '20221212')");
def test_SqlConditionFieldGt1() -> None:
cond = sql_utils.SqlConditionFieldGt("f", sql_utils.SqlDate(datetime.date(year=2022, month=12, day=12)))
assert(str(cond) == "(F > '20221212')");
def test_SqlConditionFieldGe1() -> None:
cond = sql_utils.SqlConditionFieldGe("f", sql_utils.SqlDate(datetime.date(year=2022, month=12, day=12)))
assert(str(cond) == "(F >= '20221212')");
def test_SqlConditionAnd1() -> None:
conj = sql_utils.SqlConditionAnd();
assert(str(conj) == "(1=1)");
def test_SqlConditionAnd2() -> None:
cond1 = sql_utils.SqlConditionPrepared("cond1");
conj = sql_utils.SqlConditionAnd();
conj.addCondition(cond1)
assert(str(conj) == "cond1");
def test_SqlConditionAnd3() -> None:
cond1 = sql_utils.SqlConditionPrepared("cond1");
cond2 = sql_utils.SqlConditionPrepared("cond2");
conj = sql_utils.SqlConditionAnd();
conj.addCondition(cond1)
conj.addCondition(cond2)
assert(str(conj) == "(cond1 AND cond2)");
def test_SqlConditionAnd4() -> None:
cond1 = sql_utils.SqlConditionPrepared("cond1");
cond2 = sql_utils.SqlConditionPrepared("cond2");
cond3 = sql_utils.SqlConditionPrepared("cond3");
conj = sql_utils.SqlConditionAnd();
conj.addCondition(cond1)
conj.addCondition(cond2)
conj.addCondition(cond3)
assert(str(conj) == "(cond1 AND cond2 AND cond3)");
def test_SqlConditionOr1() -> None:
conj = sql_utils.SqlConditionOr();
assert(str(conj) == "(1=0)");
def test_SqlConditionOr2() -> None:
cond1 = sql_utils.SqlConditionPrepared("cond1");
conj = sql_utils.SqlConditionOr();
conj.addCondition(cond1)
assert(str(conj) == "cond1");
def test_SqlConditionOr3() -> None:
cond1 = sql_utils.SqlConditionPrepared("cond1");
cond2 = sql_utils.SqlConditionPrepared("cond2");
conj = sql_utils.SqlConditionOr();
conj.addCondition(cond1)
conj.addCondition(cond2)
assert(str(conj) == "(cond1 OR cond2)");
def test_SqlConditionOr4() -> None:
cond1 = sql_utils.SqlConditionPrepared("cond1");
cond2 = sql_utils.SqlConditionPrepared("cond2");
cond3 = sql_utils.SqlConditionPrepared("cond3");
conj = sql_utils.SqlConditionOr();
conj.addCondition(cond1)
conj.addCondition(cond2)
conj.addCondition(cond3)
assert(str(conj) == "(cond1 OR cond2 OR cond3)");
def test_SqlStatementSelect1() -> None:
sql = sql_utils.SqlStatementSelect("tabelle t")
assert (str(sql) == "SELECT * FROM tabelle t")
sql.setTop(10)
assert (str(sql) == "SELECT TOP 10 * FROM tabelle t")
sql.addFields("f1")
assert (str(sql) == "SELECT TOP 10 f1 FROM tabelle t")
sql.addFields("f2", "f3")
assert (str(sql) == "SELECT TOP 10 f1, f2, f3 FROM tabelle t")
sql.addFieldsTable("t", "f4", "f5")
assert (str(sql) == "SELECT TOP 10 f1, f2, f3, t.f4, t.f5 FROM tabelle t")
sql.having.addConditionFieldGe("f1", 5)
assert (str(sql) == "SELECT TOP 10 f1, f2, f3, t.f4, t.f5 FROM tabelle t")
sql.addGroupBy("f1", "f2")
assert (str(sql) == "SELECT TOP 10 f1, f2, f3, t.f4, t.f5 FROM tabelle t GROUP BY f1, f2 HAVING (F1 >= 5)")
j = sql.addInnerJoin("tabelle2 t2")
j.on.addConditionFieldsEq("t.f1", "t2.F1")
assert (str(sql) == "SELECT TOP 10 f1, f2, f3, t.f4, t.f5 FROM tabelle t INNER JOIN tabelle2 t2 ON (T.F1 = T2.F1) GROUP BY f1, f2 HAVING (F1 >= 5)")
def test_SqlStatementSelect2() -> None:
sql = sql_utils.SqlStatementSelect("t1")
sql.addJoin("left join t2 on cond2")
assert (str(sql) == "SELECT * FROM t1 left join t2 on cond2")
sql.addJoin("left join t3 on cond3")
assert (str(sql) == "SELECT * FROM t1 left join t2 on cond2 left join t3 on cond3")
def test_SqlStatementSelect4() -> None:
sql = sql_utils.SqlStatementSelect("t")
sql.where.addCondition("cond1")
assert (str(sql) == "SELECT * FROM t WHERE (cond1)")
sql.where.addCondition("cond2")
assert (str(sql) == "SELECT * FROM t WHERE ((cond1) AND (cond2))")
def test_SqlStatementSelect5() -> None:
sql = sql_utils.SqlStatementSelect("t")
cond = sql_utils.SqlConditionOr();
sql.where.addCondition(cond)
cond.addCondition("cond1")
assert (str(sql) == "SELECT * FROM t WHERE (cond1)")
cond.addCondition("cond2")
assert (str(sql) == "SELECT * FROM t WHERE ((cond1) OR (cond2))")
def test_SqlStatementSelect6() -> None:
sql = sql_utils.SqlStatementSelect("t")
sql.where = sql_utils.SqlConditionOr();
sql.where.addCondition("cond1")
assert (str(sql) == "SELECT * FROM t WHERE (cond1)")
sql.where.addCondition("cond2")
assert (str(sql) == "SELECT * FROM t WHERE ((cond1) OR (cond2))")