#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Библиотека двумерной графики libGraph2D
---------------------------------------
"""
# TODO:https://stackoverflow.com/questions/11874767/how-do-i-plot-in-real-time-in-a-while-loop
import numpy as np
import time
import matplotlib.pyplot as plt
from matplotlib.widgets import Button
import pickle
from pyquaternion import Quaternion
__author__ = "Alexander Lelekov"
__license__ = "GPLv3-or-later"
__email__ = "a.t.lelekov@yandex.ru"
__version__ = "0.1.2"
NumberOfPoints = int(2**14) # колво точек графика. Если при работе станет больше, то просто увеличиваем массивы вдвое.
[документация]
def superScript(s):
""" для метки, формирует в LaTeX верхние индексы """
if '_' in s:
return '$'+s.replace('_', '^{')+'}$'
else:
return s
[документация]
class DataStore:
""" хранитель данных (внутри используется numpy.ndarray)
простого типа int|float|numpy.ndarray (не кватернион)
можно добавлять точки по одной или чанками:
- размерность m * n, где m-индекс по времени, n-размерность вектора
- выбор на основе длины атрибута объекта, на момент инициализации
- ожидается, что длина чанка chunkLen _не_будет_меняться_
Args:
Obj (object) - объект, атрибут которого будем хранить
attribute (str) - имя атрибута
n (int) - размерность атрибута (если вектор, то его длина)
chunkLen (int) - длина чанка
"""
def __init__(self, Obj, attribute, n=0, chunkLen=1):
self.Obj = Obj
self.attribute = attribute
self.Len = chunkLen
self.n = n
self.Reset()
[документация]
def Reset(self):
""" заполнить массивы NaN-ом и сбросить индекс """
self.Np = NumberOfPoints # текущее колво точек
self.i = 0 # индекс
if self.n==0:
self.Data = np.full((NumberOfPoints), np.NaN) # пустой массив
else:
self.Data = np.full((NumberOfPoints, self.n), np.NaN) # пустой массив
[документация]
def Add(self):
""" добавляем данные поточечно или чанками (с проверкой длины)
индекс инкрементируется внутри Add """
if self.i+self.Len >= self.Np: # проверка - влезет или нет
if self.n==0:
self.Data = np.concatenate((self.Data, np.full((NumberOfPoints), np.NaN)))
else:
self.Data = np.concatenate((self.Data, np.full((NumberOfPoints, self.n), np.NaN))) # увеличиваем массив на Np
self.Np += NumberOfPoints
self.Data[self.i : self.i+self.Len] = getattr(self.Obj, self.attribute)
self.i += self.Len
[документация]
class Line_xy():
""" Менеджер линии. Копит и сохраняет данные, отрисовывает линию.
Простого типа int|float|numpy.ndarray (не вектор/кватернион)
Можно добавлять точки по одной или чанками (см. DataStore)
Добавляет метку на оси: переменная_y(переменная_x)
Args:
ax (axes) - оси, где рисовать линию
time (DataStore) - объект DataStore, хранящий время в поле Data
Obj (object) - объект, атрибут которого будем хранить
Attributes (list2 of str) - имена атрибутов
chunkLen (int) - длина чанка
"""
def __init__(self, ax, time, Obj, Attributes, chunkLen=1):
""" настраиваем объекты DataStore, создаём линию и метку"""
if len(Attributes) != 2 or not (type(Attributes[0]) is str) or not (type(Attributes[1]) is str):
raise ValueError("Attributes (list2 of str) - имена атрибутов")
a1, a2 = Attributes
self.a1 = DataStore(Obj, a1, 1, chunkLen)
self.a2 = DataStore(Obj, a2, 1, chunkLen)
self.ax = ax
self.time = time # указатель на DataStore для времени
label = superScript(a2) + '(' + superScript(a1) + ')'
self.line = self.ax.plot(self.a1.Data, self.a2.Data, label=label)[0]
def Reset(self):
self.a1.Reset()
self.a2.Reset()
[документация]
def Add(self):
""" добавляем данные """
self.a1.Add()
self.a2.Add()
[документация]
def Plot(self):
""" плотим накопленные данные на график """
self.line.set_data(self.a1.Data, self.a2.Data)
[документация]
class PlotXY():
""" Динамический XY график
Динамический 2D график получает данные, каждый цикл копит их внутри,
во внутренних переменных, но отрисовывает их по специальной команде.
Вызов отрисовщика происходит с заданным интервалом в реальном времени
Это не бегущий график, а просто обычный график, в который добавляется информация
с некоторым шагом в реальном времени
Args:
DB (obj): объект базы данных, атрибуты которого содержат данные для отрисовки.
Объект обязательно должен содержать атрибут с текущим временем (Time)
Пример использования в классе задачи:
.. code-block:: python
def Setup(self):
self.plot1 = Plot2D(DB=self)
self.plot1.Setup('t', [['x1','y1'], ['x2','y2']])
def Initialize(self):
self.plot1.Initialize()
def Run(self):
self.plot1.Run() # отправляем результаты расчёта на график
def Finalize(self):
self.plot1.Finalize()
"""
Np = NumberOfPoints
dtPlot = 1. # дельта времени, через которое отрисовываем график
def __init__(self, DB=None):
if DB is None:
raise ValueError('Plot2D: укажите объект базы данных DB')
self.DB = DB
[документация]
def Setup(self, TimeKey, Keys, chunkLen=1, name=None, tight=False, axLabels=None):
""" Метод настраивает вид графика
Args:
TimeKey (str): имя переменной (м.б float|numpy.array|list) в которой хранится время
Keys (list): список переменных БД, которые будут отрисовываться на графиках.
формат Keys
[['x1','y1']] - переменные
длина списка соответствует колву графиков
chunkLen (int): длина чанка данных, который будет передаваться в переменных
name (str): имя графика. Окно графика будет иметь вид 'Figure 1: name'
Указывайте имя фигуры, это а) удобно и б) поможет избежать некоторых ошибок..
axLabels (list2 of str): метки осей, типа r'угол атаки $\alpha,\,^{\circ}$'
"""
self.TimeKey = TimeKey
self.Keys = Keys
self.chunkLen = chunkLen
self.ti = time.time() # текущее время
self.NpCurrent = self.Np
self.tight = tight
# имя фигуры
if name is None:
self.name = self.DB.name
else:
self.name = name
# --- Создаём фигуру. Если уже есть с нашим именем - используем её
figNum = None
# ищем в фигуру, содержащую указанное при инициализации имя name
for i in plt.get_fignums():
figure = plt.figure(i)
name = figure.canvas.manager.get_window_title()
if self.name in name:
figNum = i
if not(figNum is None):
# используем существующую фигуру
self.figure = plt.figure(figNum)
self.figure.clear()
self.ax = plt.subplot(1, 1, 1)
else: # создаём новую
self.figure, self.ax = plt.subplots(1, 1, figsize=(8,6), sharex=True)
figNum = self.figure.number
self.figure.canvas.manager.set_window_title(f'Figure {figNum}: ' + self.name)
self.ax.grid(visible=True) # сетку на всех осях
if tight:
self.figure.tight_layout()
self.axLabels = axLabels
if not (axLabels is None):
self.ax.set_xlabel(axLabels[0])
self.ax.set_ylabel(axLabels[1])
# Структура для хранения между обновлением фигуры
self.Time = DataStore(self.DB, self.TimeKey, chunkLen=self.chunkLen) # хранилка для времени
# создаём линии (по типам атрибутов), легенды
def typeIsOk(typ, val):
return (typ is float) or (typ is np.float64) or (typ is int) or (typ is np.ndarray and len(val.shape)==1)
self.Lines = [] # список линий, по структуре соответствующий Keys
for keysItem in self.Keys:
key1, key2 = keysItem
val1, val2 = getattr(self.DB, key1), getattr(self.DB, key2)
typ1, typ2 = type(val1), type(val2)
if typeIsOk(typ1, val1) and typeIsOk(typ2, val2):
line = Line_xy(self.ax, self.Time, self.DB, keysItem, chunkLen=self.chunkLen)
else:
raise ValueError(f' Error with keys {keysItem} while setup xy line')
self.Lines.append(line)
self.ax.legend()
# self.Plot()
[документация]
def Initialize(self):
""" инициализация графика (очистка при старте) """
# проверкa на закрытие окна
if not plt.fignum_exists(self.figure.number):
self.Setup(TimeKey=self.TimeKey, Keys=self.Keys, chunkLen=self.chunkLen)
else:
self.Reset()
self.Plot()
[документация]
def Run(self):
""" добавление точки (текущего состояния БД) """
# self.Time.Add()
for line in self.Lines:
line.Add()
# плотим накопленные данные на график
if time.time() - self.ti > self.dtPlot:
self.Plot()
[документация]
def Plot(self):
""" плотим накопленные данные на график """
for line in self.Lines:
line.Plot()
self.ti = time.time()
# автомасштабируем оси
self.ax.relim()
self.ax.autoscale_view(True, True, True)
self.figure.canvas.draw_idle()
self.figure.canvas.flush_events()
[документация]
def Finalize(self):
""" финализация - дорисовываем на графике накопленные данные """
self.Plot()
self.figure.canvas.draw_idle()
self.figure.canvas.flush_events()
def Reset(self):
for line in self.Lines:
line.Reset()
# self.Plot()
[документация]
class Line_fiv(DataStore):
""" Менеджер линии. Копит и сохраняет данные от времени (наследует от DataStore),
отрисовывает линию.
Простого типа int|float|numpy.ndarray (не кватернион)
Для отрисовки кватерниона либо перевести его в numpy.ndarray, либо
использовать класс Line_q (правда он работает с ним только поточечно)
Можно добавлять точки по одной или чанками (см. DataStore)
Добавляет красивую метку на оси в LaTeX, в зависимости от размерности:
для одномерного просто своё имя
2D: индексы x, y (предполагается что вектор)
3D: индексы x, y, z (предполагается что вектор)
4D: индексы w, x, y, z (предполагается что кватернион)
Args:
ax (axes) - оси, где рисовать линию
time (DataStore) - объект DataStore, хранящий время в поле Data
Obj (object) - объект, атрибут которого будем хранить
attribute (str) - имя атрибута
n (int) - размерность атрибута (если вектор, то его длина)
chunkLen (int) - длина чанка
"""
def __init__(self, ax, time, Obj, attribute, n=0, chunkLen=1):
super().__init__(Obj, attribute, n, chunkLen)
self.ax = ax
self.time = time # указатель на DataStore для времени
if '_' in attribute:
lPrefix = '$'+attribute.replace('_', '^{')+'}_'
else:
lPrefix = '$'+attribute+'_'
match n:
case 0|1:
label = lPrefix[:-1]+'$'
case 2:
label = [lPrefix+'x$', lPrefix+'y$']
case 3:
label = [lPrefix+'x$', lPrefix+'y$', lPrefix+'z$']
case 4:
label = [lPrefix+'w$', lPrefix+'x$', lPrefix+'y$', lPrefix+'z$']
case _:
label = [lPrefix+str(i) for i in range(n)]
self.lines = self.ax.plot(self.time.Data, self.Data, label=label)
[документация]
def Plot(self):
""" плотим накопленные данные на график """
if self.n>0:
for i in range(self.n):
self.lines[i].set_data(self.time.Data, self.Data[:,i])
else:
self.lines[0].set_data(self.time.Data, self.Data)
[документация]
class Line_q(Line_fiv):
""" класс для линии, принимающей кватернион поточечно
(т.е. может брать данные из кватерниона)
Args:
ax (axes) - оси, где рисовать линию
time (DataStore) - объект DataStore, хранящий время в поле Data
Obj (object) - объект, атрибут которого будем хранить
attribute (str) - имя атрибута (допускается только кватернион)
"""
def __init__(self, ax, time, Obj, attribute):
super().__init__(ax, time, Obj, attribute, n=4)
[документация]
def Add(self):
""" добавляем данные кватерниона поточечно """
if self.i+self.Len >= self.Np: # проверка - влезет или нет
if self.n==0:
np.concatenate((self.Data, np.full((NumberOfPoints), np.NaN)))
else:
self.Data = np.concatenate((self.Data, np.full((NumberOfPoints, self.n), np.NaN))) # увеличиваем массив на Np
self.Np += NumberOfPoints
q = getattr(self.Obj, self.attribute)
for j in range(4):
self.Data[self.i,j] = q[j]
self.i += 1
[документация]
class Plot2DBase():
""" Базовый класс графика. Реализует открытие фигуры, инициализацию и т.п. """
def __init__(self, DB=None):
if DB is None:
raise ValueError('Plot2DBase: укажите объект базы данных DB')
self.DB = DB
[документация]
def Setup(self, nAx, name=None, tight=False):
""" Метод настраивает вид фигуры
Args:
name (str): имя фигуры. Окно фигуры будет иметь вид 'Figure 1: name'
"""
self.name = name or self.DB.name # имя фигуры - None? Заменим на DB.name
# --- Создаём фигуру. Если уже есть с нашим именем - используем её
figNum = None
# ищем в фигуру, содержащую указанное при инициализации имя name
for i in plt.get_fignums():
figure = plt.figure(i)
name = figure.canvas.manager.get_window_title()
if self.name in name:
figNum = i
self.nAx = nAx
if not(figNum is None):
# используем существующую фигуру
self.figure = plt.figure(figNum)
self.figure.clear()
self.Axes = [plt.subplot(self.nAx, 1, self.nAx)]
for i in range(1, self.nAx):
self.Axes.append(plt.subplot(self.nAx, 1, self.nAx - i, sharex=self.Axes[0]))
self.Axes.reverse()
else: # создаём новую
self.figure, self.Axes = plt.subplots(self.nAx, 1, figsize=(8,6), sharex=True)
figNum = self.figure.number
self.figure.canvas.manager.set_window_title(f'Figure {figNum}: ' + self.name)
if self.nAx==1:
self.Axes = [self.Axes]
self.tight = tight
if self.tight:
self.figure.tight_layout()
self.needSetup = False
[документация]
def Initialize(self):
""" инициализация фигуры (очистка при старте) """
# проверкa на закрытие окна
if not plt.fignum_exists(self.figure.number):
self.Setup(self.Keys, name=self.name, tight=self.tight)
self.needSetup = True
def ReLimAxes(self):
for ax in self.Axes: # автомасштабируем оси
ax.relim()
ax.autoscale_view(True, True, True)
def Plot(self):
self.figure.canvas.draw_idle()
self.figure.canvas.flush_events()
[документация]
def Finalize(self):
""" финализация - дорисовываем на фигуре накопленные данные """
self.Plot()
[документация]
class Plot2D():
""" Динамический 2D график
Динамический 2D график получает данные, каждый цикл копит их внутри,
во внутренних переменных, но отрисовывает их по специальной команде.
Вызов отрисовщика происходит с заданным интервалом в реальном времени
Это не бегущий график, а просто обычный график, в который добавляется информация
с некоторым шагом в реальном времени
TODO: наследовать от Plot2DBase
TODO: https://stackoverflow.com/questions/40126176/fast-live-plotting-in-matplotlib-pyplot
Args:
DB (obj): объект базы данных, атрибуты которого содержат данные для отрисовки.
Объект обязательно должен содержать атрибут с текущим временем (Time)
Пример использования в классе задачи:
.. code-block:: python
def Setup(self):
self.plot1 = Plot2D(DB=self)
self.plot1.Setup('t', [['w_B'], ['w_I']])
def Initialize(self):
self.plot1.Initialize()
def Run(self):
self.plot1.Run() # отправляем результаты расчёта на график
def Finalize(self):
self.plot1.Finalize()
"""
Np = NumberOfPoints
dtPlot = 0.5 # дельта времени, через которое отрисовываем график
def __init__(self, DB=None):
if DB is None:
raise ValueError('Plot2D: укажите объект базы данных DB')
self.DB = DB
[документация]
def Setup(self, TimeKey, Keys, chunkLen=1, name=None, tight=False):
""" Метод настраивает вид графика
Args:
TimeKey (str): имя переменной (м.б float|numpy.array|list) в которой хранится время
Keys (list): список переменных БД, которые будут отрисовываться на графиках.
формат Keys
[['g'], - переменная от времени
['g', 'x1']] - две переменные на одном графике
длина списка соответствует колву графиков
chunkLen (int): длина чанка данных, который будет передаваться в переменных
name (str): имя графика. Окно графика будет иметь вид 'Figure 1: name'
"""
self.TimeKey = TimeKey
self.Keys = Keys
self.chunkLen = chunkLen
self.ti = time.time() # текущее время
self.NpCurrent = self.Np
# имя фигуры
if name is None:
self.name = self.DB.name
else:
self.name = name
# --- Создаём фигуру. Если уже есть с нашим именем - используем её
figNum = None
# ищем в фигуру, содержащую указанное при инициализации имя name
for i in plt.get_fignums():
figure = plt.figure(i)
name = figure.canvas.manager.get_window_title()
if self.name in name:
figNum = i
if not(figNum is None):
# используем существующую фигуру
self.figure = plt.figure(figNum)
self.figure.clear()
n = len(self.Keys)
self.Axes = [plt.subplot(n, 1, n)]
for i in range(1, n):
self.Axes.append(plt.subplot(n, 1, n-i, sharex=self.Axes[0]))
self.Axes.reverse()
else: # создаём новую
self.figure, self.Axes = plt.subplots(len(self.Keys), 1, figsize=(8,6), sharex=True)
figNum = self.figure.number
self.figure.canvas.manager.set_window_title(f'Figure {figNum}: ' + self.name)
if len(self.Keys)==1:
self.Axes = [self.Axes]
if tight:
self.figure.tight_layout()
# Структура для хранения между обновлением фигуры
self.Time = DataStore(self.DB, self.TimeKey, chunkLen=self.chunkLen) # хранилка для времени
# создаём линии (по типам атрибутов), легенды
self.Lines = [] # список линий, по структуре соответствующий Keys
for (keysItem, ax) in zip(self.Keys, self.Axes):
ax.grid(visible=True) # сетку на всех осях
for key in keysItem:
val = getattr(self.DB, key)
typ = type(val)
if (typ is float) or (typ is np.float64) or (typ is int):
line = Line_fiv(ax, self.Time, self.DB, key, chunkLen=self.chunkLen)
elif typ is np.ndarray and len(val.shape)==1:
line = Line_fiv(ax, self.Time, self.DB, key, n=val.shape[0], chunkLen=self.chunkLen)
elif typ is list: # должен быть одномерным!
line = Line_fiv(ax, self.Time, self.DB, key, n=len(val), chunkLen=self.chunkLen)
elif typ is np.ndarray and len(val.shape)==2:
line = Line_fiv(ax, self.Time, self.DB, key, n=val.shape[1], chunkLen=self.chunkLen)
elif typ is Quaternion:
line = Line_q(ax, self.Time, self.DB, key) # кватернион только поточечно
else:
raise ValueError(f' No type for {key} (type {typ})')
self.Lines.append(line)
ax.legend()
# создаём кнопки
axButton = self.figure.add_axes([0.9, 0.96, 0.08, 0.035])
# self.btnReset = Button(axButton, 'Reset')
# self.btnReset.on_clicked(self.onResetClicked)
# axButton = self.figure.add_axes([0.8, 0.96, 0.08, 0.035])
self.btnSave = Button(axButton, 'Save')
self.btnSave.on_clicked(self.onSaveClicked)
self.Plot()
[документация]
def Initialize(self):
""" инициализация графика (очистка при старте) """
# проверкa на закрытие окна
if not plt.fignum_exists(self.figure.number):
self.Setup(TimeKey=self.TimeKey, Keys=self.Keys, chunkLen=self.chunkLen)
else:
self.onResetClicked(None)
[документация]
def Run(self):
""" добавление точки (текущего состояния БД) """
self.Time.Add()
for line in self.Lines:
line.Add()
# плотим накопленные данные на график
if time.time() - self.ti > self.dtPlot:
self.Plot()
[документация]
def Plot(self):
""" плотим накопленные данные на график """
for line in self.Lines:
line.Plot()
self.ti = time.time()
for ax in self.Axes: # автомасштабируем оси
ax.relim()
ax.autoscale_view(True, True, True)
self.figure.canvas.draw_idle()
self.figure.canvas.flush_events()
[документация]
def Finalize(self):
""" финализация - дорисовываем на графике накопленные данные """
self.Plot()
self.figure.canvas.draw_idle()
self.figure.canvas.flush_events()
[документация]
def onResetClicked(self, event):
""" обработчик нажатия на Reset """
self.Time.Reset()
for line in self.Lines:
line.Reset()
self.Plot()
[документация]
def onSaveClicked(self, event):
""" обработчик нажатия на Save
сохраняем данные всех линий в файл pickle с уникальным именем """
D = {'time': self.Time.Data}
for line in self.Lines:
D[line.attribute] = line.Data
ms = int((time.time() % 1)*1000 // 1) # время в миллисекундах, для уникального имени
fileName = time.strftime("%y%m%d_%H-%M-%S-") + str(ms) + '_' + self.DB.name + '_data.pickle'
with open(fileName, 'wb') as f:
pickle.dump(D, f)
print('[+] Data saved in task ' + self.name + ', file:' + fileName)
#TODO: сделать сохранение при работающей задаче
[документация]
class Storager():
""" Хранитель данных. Сохраняет данные внутри себя (в numpy.ndarray),
затем по финализации сохраняет их в файл
DB (obj): объект базы данных, атрибуты которого содержат данные для отрисовки.
Объект обязательно должен содержать атрибут с текущим временем (Time)
Пример использования в классе задачи:
.. code-block:: python
def Setup(self):
self.storage = Storager(DB=self)
self.storage.Setup('t', ['w_B', 'B_B'])
def Initialize(self):
self.storage.Initialize()
def Run(self):
self.storage.Run()
def Finalize(self):
self.storage.Finalize()
"""
Np = NumberOfPoints
def __init__(self, DB=None):
if DB is None:
raise ValueError('Storager: укажите объект базы данных DB')
self.DB = DB
[документация]
def Setup(self, TimeKey, Keys, chunkLen=1, directory=''):
""" Метод настраивает Хранителя
Args:
TimeKey (str): имя переменной (м.б float|numpy.array|list) в которой хранится время
Keys (list of str): список переменных БД, которые будут хранится.
chunkLen (int): длина чанка данных, который будет передаваться в переменных
directory (str): директория, куда сохранять файлы, с завершающим '/' или '\'
"""
self.TimeKey = TimeKey
self.Keys = Keys
self.chunkLen = chunkLen
self.ti = time.time() # текущее время
self.NpCurrent = self.Np
self.directory = directory
# Структура для хранения между обновлением фигуры
self.Time = DataStore(self.DB, self.TimeKey, chunkLen=self.chunkLen) # хранилка для времени
self.Items = [] # словарь хранилок, по структуре соответствующий Keys
for key in Keys:
val = getattr(self.DB, key)
typ = type(val)
if (typ is float) or (typ is np.float64) or (typ is int):
item = DataStore(self.DB, key, chunkLen=self.chunkLen)
elif typ is np.ndarray and len(val.shape)==1:
item = DataStore(self.DB, key, n=val.shape[0], chunkLen=self.chunkLen)
elif typ is list: # должен быть одномерным!
item = DataStore(self.DB, key, n=len(val), chunkLen=self.chunkLen)
elif typ is np.ndarray and len(val.shape)==2:
item = DataStore(self.DB, key, n=val.shape[1], chunkLen=self.chunkLen)
elif typ is Quaternion:
item = DataStore(self.DB, key) # кватернион только поточечно
else:
raise ValueError(f' No type for {key}')
self.Items.append(item)
[документация]
def Initialize(self):
""" инициализация Хранителя (очистка при старте) """
self.Time.Reset()
for item in self.Items:
item.Reset()
[документация]
def Run(self):
""" добавление точки (текущего состояния БД) """
self.Time.Add()
for item in self.Items:
item.Add()
def removeNaNs(self):
# TODO: перенести в DataStore
idx = np.where(np.isnan(self.Time.Data))[0][0] # с него начинаются NaN
self.Time.Data = self.Time.Data[:idx]
for item in self.Items:
item.Data = item.Data[:idx]
[документация]
def Finalize(self, fileName=None):
""" финализация - сохраняем накопленные данные
Args:
fileName (str): имя файла.
None - данные будут сохранены в поле Res в словаре
'pickle' | 'csv' - будет сгенерировано уникальное имя (дата-время-имя задачи)
строка с полным именем - сохранится с форматом по .endswith
"""
idx = np.where(np.isnan(self.Time.Data))[0][0] # с него начинаются NaN
if fileName is None:
# self.removeNaNs() # ??
self.Res = {'time': self.Time.Data[:idx]}
for item in self.Items:
self.Res[item.attribute] = item.Data[:idx]
print('[+] Task ' + self.DB.name + ': Results stored in .Res field of Storager object.')
elif fileName is str:
if fileName=='pickle' or fileName=='csv': # генерируем уникальное имя
ms = int((time.time() % 1)*1000 // 1) # время в миллисекундах, для уникального имени
fileName = time.strftime("%y%m%d_%H-%M-%S-") + str(ms) + '_' + self.DB.name + '_data.' + fileName
if fileName.endswith('pickle'):
D = {'time': self.Time.Data[:idx]}
for item in self.Items:
D[item.attribute] = item.Data[:idx]
with open(self.directory + fileName, 'wb') as f:
pickle.dump(D, f)
print('[+] Data saved in task ' + self.DB.name + ', file:' + fileName)
elif fileName.endswith('csv'):
M = np.expand_dims(self.Time.Data[:idx], axis=1)
header = 't\t'
for item in self.Items:
N = item.Data[:idx,:]
if len(N.shape)>1:
n = N.shape[1]
match n:
case 1:
attrHeader = ['']
case 2:
attrHeader = ['_x', '_y']
case 3:
attrHeader = ['_x', '_y', '_z']
case 4:
attrHeader = ['_w', '_x', '_y', '_z']
case _:
attrHeader = ['_'+str(i) for i in range(n)]
for i in range(n):
header += item.attribute + attrHeader[i] + '\t'
else:
header += item.attribute + '\t'
M = np.hstack((M,N))
np.savetxt(self.directory + fileName, M, fmt='%.5e', delimiter='\t', header=header)
print('[+] Data saved in task ' + self.DB.name + ', file:' + fileName)
else:
print('[!] Warning in storager ' + self.DB.name + ', unexpected fileName:' + fileName)
[документация]
class Contour():
""" Менеджер графика линий уровня. Копит и сохраняет данные, отрисовывает линию.
Простого типа int|float|numpy.ndarray (не вектор/кватернион)
Можно добавлять точки по одной или чанками (см. DataStore)
Добавляет метку на оси: z(x, y)
Args:
ax (axes) - оси, где рисовать линию
Obj (object) - объект, атрибут которого будем отрисовывать
keyX, keyY, keyZ - имена атрибутов
"""
def __init__(self, ax, Obj, keyX, keyY, keyZ):
""" настраиваем объекты DataStore, создаём линию и метку
TODO: перенести в Setup """
self.keyX, self.keyY, self.keyZ = keyX, keyY, keyZ
self.Obj = Obj
self.ax = ax
label = superScript(self.keyX)
label += ' (' + superScript(self.keyY) + ', ' + superScript(self.keyZ) + ')'
X = getattr(self.Obj, self.keyX)
Y = getattr(self.Obj, self.keyY)
Z = getattr(self.Obj, self.keyZ)
self.contour = self.ax.contourf(X, Y, Z)
# def Add(self):
# """ добавляем данные """
# pass
[документация]
def Plot(self):
""" плотим накопленные данные на график """
X = getattr(self.Obj, self.keyX)
Y = getattr(self.Obj, self.keyY)
Z = getattr(self.Obj, self.keyZ)
self.contour.remove()
self.contour = self.ax.contourf(X, Y, Z)
[документация]
class PlotXYcnt(PlotXY):
""" plotXY + contour (по матрице) """
dtPlot = 1.5 # дельта времени, через которое отрисовываем график
[документация]
def Setup(self, TimeKey, Keys, aedb=None, chunkLen=1, name=None, tight=False):
""" aedb (dict): словарь с именами матриц, которые будут использоваться в contour
Ключи 'x', 'y', 'z' содержат имена
"""
super().Setup(TimeKey=TimeKey, Keys=Keys, chunkLen=chunkLen, name=name, tight=tight)
if aedb is None:
raise Exception('[!] не задан словарь aedb')
self.aedb = aedb
self.keyX, self.keyY, self.keyZ = aedb['x'], aedb['y'], aedb['z']
self.contour = Contour(self.ax, self.DB, self.keyX, self.keyY, self.keyZ)
[документация]
def Initialize(self):
""" инициализация графика (очистка при старте) """
# проверкa на закрытие окна
if not plt.fignum_exists(self.figure.number):
self.Setup(TimeKey=self.TimeKey, Keys=self.Keys, chunkLen=self.chunkLen,
name=self.name, tight=self.tight, aedb=self.aedb)
self.Plot()
[документация]
def Run(self):
""" плотим накопленные данные на график """
self.contour.Plot()
super().Plot()