CSE-Institut | CSE Center of Safety Excellence gGmbH
Seite wählen

Repository

binarymixupdetection

CSE Institute: sRMC Project
Projekt: sRMC – Safety related Remote Process Monitoring & Control
Readme

This repository accompanies the publication titled „Enhancing acoustic leak detection with Data augmentation: overcoming background noise challenges“. It contains the source code and resources outlined below:

  1. STFT Data Processing: Code to process audio inputs into 384×384 labeled STFT spectrograms.
  2. Audio Augmentation: Includes methods for the linear combination of background noise with leak sounds.
  3. CNNs for Classification:
  • Johnson et al. (2020) Network: Rebuild of the model from Johnson et al. (2020)
  • (A)-AlexNet: Modified version tailored for (384,384) input
  • (A)-VGG16: Modified version tailored for (384,384) input
  • (A)-ResNet18: Modified version tailored for (384,384) input

Each section of the repository is designed to support different stages of the audio processing and machine learning workflow detailed in our publication.

For further details, please refer to the individual directories and documentation within the repository.

(A)-AlexNet

import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import torchmetrics
from torchsummary import summary
from sklearn.metrics import precision_score, recall_score
import random
os.environ[‚CUDA_LAUNCH_BLOCKING‘] = ‚1‘

# Paths
train_folder_path = ‚trainpath‘
val_folder_path = ‚valpath‘
test_folder_path = ‚testpath‘

num_epochs = 50
learning_rate = 0.00001
batch_size = 32
audio_length = ‚for_description‘

class CustomDataset(Dataset):
def __init__(self, file_list):
self.file_list = file_list

def __len__(self):
return len(self.file_list)

def __getitem__(self, idx):
with np.load(self.file_list[idx]) as data_file:
stft = data_file[’stft‘]
lbls = 1 if data_file[‚labels‘] == ‚JA‘ else 0
if stft.size == 0 or np.isnan(stft).any():
print(f“Invalid Data at Index {idx}, try another“)
return self.__getitem__(np.random.randint(len(self.file_list)))
stft = (stft – np.mean(stft)) / (np.std(stft) + 1e-10) #Z-score Normalisierung
return torch.tensor(stft, dtype=torch.float32), torch.tensor(lbls, dtype=torch.float32)

def main():
train_files = [os.path.join(train_folder_path, file) for file in os.listdir(train_folder_path) if file.endswith(„.npz“)]
val_files = [os.path.join(val_folder_path, file) for file in os.listdir(val_folder_path) if file.endswith(„.npz“)]
test_files = [os.path.join(test_folder_path, file) for file in os.listdir(test_folder_path) if
file.endswith(„.npz“)]

# Create Datasets
train_data = CustomDataset(train_files)
random.shuffle(val_files)
random.shuffle(test_files)
val_data = CustomDataset(val_files)
test_data = CustomDataset(test_files)

# Create DataLoader
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=44)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, num_workers = 44)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=44)

class Net(nn.Module):
def __init__(self): #AlexNET
super(Net, self).__init__()
# Feature extraction layers
self.features = nn.Sequential(
nn.Conv2d(1, 64, kernel_size=11, stride=4, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Conv2d(64, 192, kernel_size=5, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Conv2d(192, 384, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(384, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
)

# Fully connected layers
self.classifier = nn.Sequential(
nn.Dropout(),
nn.Linear(256 * 11 * 11, 4096),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Linear(4096, 4096),
nn.ReLU(inplace=True),
nn.Linear(4096, 1),
nn.Sigmoid() # Sigmoid activation for binary classification
)

def forward(self, x):
x = self.features(x)

x = x.view(x.size(0), 256 * 11 * 11)
x = self.classifier(x)
return x

# Create Model
model = Net()

# Train GPU
device = torch.device(‚cuda‘)
model = model.to(device)

# Summary Model
summary(model, (1, 1025, 24))

# Optimizer and Loss
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCELoss() #binary cross entropie loss

# Lists for Metrics
train_acc_list = []
val_acc_list = []
val_prec_list = []
val_recall_list = []
train_loss_list = []
val_loss_list = []

# Training
for epoch in range(num_epochs):
model.train()
test_loss = 0.0
correct = 0
total = 0
running_loss = 0.0

# Define Metrics
accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
precision = torchmetrics.Precision(‚binary‘).to(device)
recall = torchmetrics.Recall(‚binary‘).to(device)

for i, data in enumerate(train_loader, 0):
inputs, labels = data[0].float().to(device), data[1].float().to(device)
optimizer.zero_grad()

outputs = model(inputs).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()

loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

for name, param in model.named_parameters():
if param.requires_grad:
grad_mean = param.grad.mean() if param.grad is not None else ‚No gradient‘
# print(f“Layer: {name}, Weight: {param.data.mean()}, Gradient: {grad_mean}“)

running_loss += loss.item()

# Calculate Metrics
train_acc = accuracy(predicted, labels)
train_prec = precision(predicted, labels)
train_recall = recall(predicted, labels)

# Print Progress
print(
f’Epoch [{epoch + 1}/{num_epochs}], Batch [{i + 1}/{len(train_loader)}], Train Loss: {loss.item()}, Train Accuracy: {train_acc}, Train Precision: {train_prec}, Train Recall: {train_recall}‘)

# Reset Validationsmetric after every epoch
val_accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
val_precision = torchmetrics.Precision(‚binary‘).to(device)
val_recall = torchmetrics.Recall(‚binary‘).to(device)
val_running_loss = 0.0

# Validation
with torch.no_grad():
for data in val_loader:
images, labels = data[0].float().to(device), data[1].float().to(device)
outputs = model(images).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()

loss = criterion(outputs, labels)
val_running_loss += loss.item()

# update val-metrics
val_accuracy(predicted, labels)
val_precision(predicted, labels)
val_recall(predicted, labels)

# Calculate final val-metrics
final_val_acc = val_accuracy.compute()
final_val_prec = val_precision.compute()
final_val_recall = val_recall.compute()

# Saves metrics after every epoch
train_acc_list.append(train_acc.item())
val_acc_list.append(final_val_acc.item())
val_prec_list.append(final_val_prec.item())
val_recall_list.append(final_val_recall.item())
train_loss_list.append(running_loss / (i + 1))
val_loss_list.append(val_running_loss / len(val_loader))

print(
f’Epoch {epoch + 1} val_loss: {val_running_loss / len(val_loader)}, val_acc: {final_val_acc}, val_prec: {final_val_prec}, val_recall: {final_val_recall}‘)

# testloop
with torch.no_grad():
# initalisation metrics
test_accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
test_precision = torchmetrics.Precision(‚binary‘).to(device)
test_recall = torchmetrics.Recall(‚binary‘).to(device)

for data in test_loader:
images, labels = data[0].float().to(device), data[1].float().to(device)
outputs = model(images).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()

# loss
loss = criterion(outputs, labels)
test_loss += loss.item()

# update metrics
test_accuracy(predicted, labels)
test_precision(predicted, labels)
test_recall(predicted, labels)

# calculate final metrics
final_test_acc = test_accuracy.compute()
final_test_prec = test_precision.compute()
final_test_recall = test_recall.compute()

print(
f“\\nTest Accuracy: {final_test_acc}, Test Precision: {final_test_prec}, Test Recall: {final_test_recall}“)

# plot metrics
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_acc_list, label=’Training Accuracy‘)
plt.plot(val_acc_list, label=’Validation Accuracy‘)
plt.plot(train_loss_list, label=’Training Loss‘, linestyle=‘–‚)
plt.plot(val_loss_list, label=’Validation Loss‘, linestyle=‘–‚)
plt.xlabel(‚Epochs‘)
plt.ylabel(‚Score‘)
plt.ylim(0, 1)
plt.legend()

# plot metrics 2
plt.subplot(1, 2, 2)
plt.plot(val_prec_list, label=’Validation Precision‘)
plt.plot(val_recall_list, label=’Validation Recall‘)
plt.xlabel(‚Epochs‘)
plt.ylabel(‚Score‘)
plt.legend()
plt.suptitle(„(learning_rate={}, batch_size={}, audio_length={}, test_acc={}, test_prec={}, test_recall={})“.format(
learning_rate, batch_size, audio_length, final_test_acc, final_test_prec, final_test_recall))

plt.show()

if __name__ == „__main__“:
main()

(A)-RestNet18

import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import torchmetrics
from torchsummary import summary
from sklearn.metrics import precision_score, recall_score
import random
os.environ[‚CUDA_LAUNCH_BLOCKING‘] = ‚1‘

# Paths
train_folder_path = ‚trainpath‘
val_folder_path = ‚valpath‘
test_folder_path = ‚testpath‘

num_epochs = 50
learning_rate = 0.00001
batch_size = 32
audio_length = ‚for_description‘

class CustomDataset(Dataset):
def __init__(self, file_list):
self.file_list = file_list

def __len__(self):
return len(self.file_list)

def __getitem__(self, idx):
with np.load(self.file_list[idx]) as data_file:
stft = data_file[’stft‘]
lbls = 1 if data_file[‚labels‘] == ‚JA‘ else 0
if stft.size == 0 or np.isnan(stft).any():
print(f“Invalid Data at index {idx}, try again“)
return self.__getitem__(np.random.randint(len(self.file_list)))
stft = (stft – np.mean(stft)) / (np.std(stft) + 1e-10) #Z-score Normalisierung
return torch.tensor(stft, dtype=torch.float32), torch.tensor(lbls, dtype=torch.float32)

def main():
train_files = [os.path.join(train_folder_path, file) for file in os.listdir(train_folder_path) if file.endswith(„.npz“)]
val_files = [os.path.join(val_folder_path, file) for file in os.listdir(val_folder_path) if file.endswith(„.npz“)]
test_files = [os.path.join(test_folder_path, file) for file in os.listdir(test_folder_path) if
file.endswith(„.npz“)]

# Create Datasets
train_data = CustomDataset(train_files)
random.shuffle(val_files)
random.shuffle(test_files)
val_data = CustomDataset(val_files)
test_data = CustomDataset(test_files)

# Create DataLoader
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=44)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, num_workers = 44)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=44)

class BasicBlock(nn.Module):
expansion = 1

def __init__(self, in_planes, planes, stride=1):
super(BasicBlock, self).__init__()
self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes)

self.shortcut = nn.Sequential()
if stride != 1 or in_planes != self.expansion * planes:
self.shortcut = nn.Sequential(
nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(self.expansion * planes)
)

def forward(self, x):
out = torch.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += self.shortcut(x)
out = torch.relu(out)
return out

class CustomResNet18(nn.Module):
def __init__(self, block, num_blocks, num_classes=1):
super(CustomResNet18, self).__init__()
self.in_planes = 64

self.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(64)
self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
self.linear = nn.Linear(73728, num_classes)
self.sigmoid = nn.Sigmoid()

def _make_layer(self, block, planes, num_blocks, stride):
strides = [stride] + [1] * (num_blocks – 1)
layers = []
for stride in strides:
layers.append(block(self.in_planes, planes, stride))
self.in_planes = planes * block.expansion
return nn.Sequential(*layers)

def forward(self, x):
out = torch.relu(self.bn1(self.conv1(x)))
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
out = self.layer4(out)
out = F.avg_pool2d(out, 4)
out = out.view(out.size(0), -1)
out = self.linear(out)
out = self.sigmoid(out)
return out

def custom_resnet18():
return CustomResNet18(BasicBlock, [2, 2, 2, 2])

# Create Model
model = custom_resnet18()

# Train GPU
device = torch.device(‚cuda‘)
model = model.to(device)

# Summary Model
summary(model, (1, 1025, 24))

# Optimizer and Loss
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCELoss() #binary cross entropie loss

# Lists for Metrics
train_acc_list = []
val_acc_list = []
val_prec_list = []
val_recall_list = []
train_loss_list = []
val_loss_list = []

# Training
for epoch in range(num_epochs):
model.train()
test_loss = 0.0
correct = 0
total = 0
running_loss = 0.0

# Define Metrics
accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
precision = torchmetrics.Precision(‚binary‘).to(device)
recall = torchmetrics.Recall(‚binary‘).to(device)

for i, data in enumerate(train_loader, 0):
inputs, labels = data[0].float().to(device), data[1].float().to(device)
optimizer.zero_grad()

outputs = model(inputs).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()

loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

for name, param in model.named_parameters():
if param.requires_grad:
grad_mean = param.grad.mean() if param.grad is not None else ‚No gradient‘
# print(f“Layer: {name}, Weight: {param.data.mean()}, Gradient: {grad_mean}“)

running_loss += loss.item()

# Calculate Metrics
train_acc = accuracy(predicted, labels)
train_prec = precision(predicted, labels)
train_recall = recall(predicted, labels)

# Print Progress
print(
f’Epoch [{epoch + 1}/{num_epochs}], Batch [{i + 1}/{len(train_loader)}], Train Loss: {loss.item()}, Train Accuracy: {train_acc}, Train Precision: {train_prec}, Train Recall: {train_recall}‘)

# Reset Validationsmetric after every epoch
val_accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
val_precision = torchmetrics.Precision(‚binary‘).to(device)
val_recall = torchmetrics.Recall(‚binary‘).to(device)
val_running_loss = 0.0

# Validation
with torch.no_grad():
for data in val_loader:
images, labels = data[0].float().to(device), data[1].float().to(device)
outputs = model(images).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()

loss = criterion(outputs, labels)
val_running_loss += loss.item()

# val-metric update
val_accuracy(predicted, labels)
val_precision(predicted, labels)
val_recall(predicted, labels)

# Calculate final val-metrics
final_val_acc = val_accuracy.compute()
final_val_prec = val_precision.compute()
final_val_recall = val_recall.compute()

# Saves metrics after every epoch
train_acc_list.append(train_acc.item())
val_acc_list.append(final_val_acc.item())
val_prec_list.append(final_val_prec.item())
val_recall_list.append(final_val_recall.item())
train_loss_list.append(running_loss / (i + 1))
val_loss_list.append(val_running_loss / len(val_loader))

print(
f’Epoch {epoch + 1} val_loss: {val_running_loss / len(val_loader)}, val_acc: {final_val_acc}, val_prec: {final_val_prec}, val_recall: {final_val_recall}‘)

# testloop
with torch.no_grad():
# initalisation metrics
test_accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
test_precision = torchmetrics.Precision(‚binary‘).to(device)
test_recall = torchmetrics.Recall(‚binary‘).to(device)

for data in test_loader:
images, labels = data[0].float().to(device), data[1].float().to(device)
outputs = model(images).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()

# loss
loss = criterion(outputs, labels)
test_loss += loss.item()

# update metrics
test_accuracy(predicted, labels)
test_precision(predicted, labels)
test_recall(predicted, labels)

# calculate final metrics
final_test_acc = test_accuracy.compute()
final_test_prec = test_precision.compute()
final_test_recall = test_recall.compute()

print(
f“\nTest Accuracy: {final_test_acc}, Test Precision: {final_test_prec}, Test Recall: {final_test_recall}“)

# plot metrics
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_acc_list, label=’Training Accuracy‘)
plt.plot(val_acc_list, label=’Validation Accuracy‘)
plt.plot(train_loss_list, label=’Training Loss‘, linestyle=‘–‚)
plt.plot(val_loss_list, label=’Validation Loss‘, linestyle=‘–‚)
plt.xlabel(‚Epochs‘)
plt.ylabel(‚Score‘)
plt.ylim(0, 1)
plt.legend()

# plot metrics 2
plt.subplot(1, 2, 2)
plt.plot(val_prec_list, label=’Validation Precision‘)
plt.plot(val_recall_list, label=’Validation Recall‘)
plt.xlabel(‚Epochs‘)
plt.ylabel(‚Score‘)
plt.legend()
plt.suptitle(„(learning_rate={}, batch_size={}, audio_length={}, test_acc={}, test_prec={}, test_recall={})“.format(
learning_rate, batch_size, audio_length, final_test_acc, final_test_prec, final_test_recall))

plt.show()

if __name__ == „__main__“:
main()

(A)-VGG16

import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import torchmetrics
from torchsummary import summary
from sklearn.metrics import precision_score, recall_score
import random
os.environ[‚CUDA_LAUNCH_BLOCKING‘] = ‚1‘

# Paths
train_folder_path = ‚trainpath‘
val_folder_path = ‚valpath‘
test_folder_path = ‚testpath‘

num_epochs = 50
learning_rate = 0.00001
batch_size = 32
audio_length = ‚for_description‘

class CustomDataset(Dataset):
def __init__(self, file_list):
self.file_list = file_list

def __len__(self):
return len(self.file_list)

def __getitem__(self, idx):
with np.load(self.file_list[idx]) as data_file:
stft = data_file[’stft‘]
lbls = 1 if data_file[‚labels‘] == ‚JA‘ else 0
if stft.size == 0 or np.isnan(stft).any():
print(f“Invalid Data at Index {idx}, try again“)
return self.__getitem__(np.random.randint(len(self.file_list)))
stft = (stft – np.mean(stft)) / (np.std(stft) + 1e-10) #Z-score Normalisierung
return torch.tensor(stft, dtype=torch.float32), torch.tensor(lbls, dtype=torch.float32)

def main():
train_files = [os.path.join(train_folder_path, file) for file in os.listdir(train_folder_path) if file.endswith(„.npz“)]
val_files = [os.path.join(val_folder_path, file) for file in os.listdir(val_folder_path) if file.endswith(„.npz“)]
test_files = [os.path.join(test_folder_path, file) for file in os.listdir(test_folder_path) if
file.endswith(„.npz“)]

# Create Datasets
train_data = CustomDataset(train_files)
random.shuffle(val_files)
random.shuffle(test_files)
val_data = CustomDataset(val_files)
test_data = CustomDataset(test_files)

# Create DataLoader
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=44)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, num_workers = 44)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=44)

class Net(nn.Module):
def __init__(self): #VGG16
super(Net, self).__init__()
# Feature extraction layers
self.features = nn.Sequential(
# Block 1
nn.Conv2d(1, 64, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(64, 64, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),

# Block 2
nn.Conv2d(64, 128, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(128, 128, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),

# Block 3
nn.Conv2d(128, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),

# Block 4
nn.Conv2d(256, 512, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),

# Block 5
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
)

self.classifier = nn.Sequential(
nn.Linear(512 * 12 * 12, 4096),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Linear(4096, 4096),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Linear(4096, 1),
nn.Sigmoid(),
)

def forward(self, x):
x = self.features(x)
x = x.view(x.size(0), -1)
x = self.classifier(x)
return x

# Create Model
model = Net()

# Train GPU
device = torch.device(‚cuda‘)
model = model.to(device)

# Summary Model
summary(model, (1, 1025, 24))

# Optimizer and Loss
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCELoss() #binary cross entropie loss

# Lists for Metrics
train_acc_list = []
val_acc_list = []
val_prec_list = []
val_recall_list = []
train_loss_list = []
val_loss_list = []

# Training
for epoch in range(num_epochs):
model.train()
test_loss = 0.0
correct = 0
total = 0
running_loss = 0.0

# Define Metrics
accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
precision = torchmetrics.Precision(‚binary‘).to(device)
recall = torchmetrics.Recall(‚binary‘).to(device)

for i, data in enumerate(train_loader, 0):
inputs, labels = data[0].float().to(device), data[1].float().to(device)
optimizer.zero_grad()

outputs = model(inputs).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()

loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

for name, param in model.named_parameters():
if param.requires_grad:
grad_mean = param.grad.mean() if param.grad is not None else ‚No gradient‘
# print(f“Layer: {name}, Weight: {param.data.mean()}, Gradient: {grad_mean}“)

running_loss += loss.item()

# Calculate Metrics
train_acc = accuracy(predicted, labels)
train_prec = precision(predicted, labels)
train_recall = recall(predicted, labels)

# Print Progress
print(
f’Epoch [{epoch + 1}/{num_epochs}], Batch [{i + 1}/{len(train_loader)}], Train Loss: {loss.item()}, Train Accuracy: {train_acc}, Train Precision: {train_prec}, Train Recall: {train_recall}‘)

# Reset Validationsmetric after every epoch
val_accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
val_precision = torchmetrics.Precision(‚binary‘).to(device)
val_recall = torchmetrics.Recall(‚binary‘).to(device)
val_running_loss = 0.0

# Validation
with torch.no_grad():
for data in val_loader:
images, labels = data[0].float().to(device), data[1].float().to(device)
outputs = model(images).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()

loss = criterion(outputs, labels)
val_running_loss += loss.item()

# update val-metrics
val_accuracy(predicted, labels)
val_precision(predicted, labels)
val_recall(predicted, labels)

# Calculate final val-metrics
final_val_acc = val_accuracy.compute()
final_val_prec = val_precision.compute()
final_val_recall = val_recall.compute()

# Saves metrics after every epoch
train_acc_list.append(train_acc.item())
val_acc_list.append(final_val_acc.item())
val_prec_list.append(final_val_prec.item())
val_recall_list.append(final_val_recall.item())
train_loss_list.append(running_loss / (i + 1))
val_loss_list.append(val_running_loss / len(val_loader))

print(
f’Epoch {epoch + 1} val_loss: {val_running_loss / len(val_loader)}, val_acc: {final_val_acc}, val_prec: {final_val_prec}, val_recall: {final_val_recall}‘)

# testloop
with torch.no_grad():
# initalisation metrics
test_accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
test_precision = torchmetrics.Precision(‚binary‘).to(device)
test_recall = torchmetrics.Recall(‚binary‘).to(device)

for data in test_loader:
images, labels = data[0].float().to(device), data[1].float().to(device)
outputs = model(images).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()

# loss
loss = criterion(outputs, labels)
test_loss += loss.item()

# update metrics
test_accuracy(predicted, labels)
test_precision(predicted, labels)
test_recall(predicted, labels)

# calculate final metrics
final_test_acc = test_accuracy.compute()
final_test_prec = test_precision.compute()
final_test_recall = test_recall.compute()

print(
f“\nTest Accuracy: {final_test_acc}, Test Precision: {final_test_prec}, Test Recall: {final_test_recall}“)

# plot metrics
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_acc_list, label=’Training Accuracy‘)
plt.plot(val_acc_list, label=’Validation Accuracy‘)
plt.plot(train_loss_list, label=’Training Loss‘, linestyle=‘–‚)
plt.plot(val_loss_list, label=’Validation Loss‘, linestyle=‘–‚)
plt.xlabel(‚Epochs‘)
plt.ylabel(‚Score‘)
plt.ylim(0, 1)
plt.legend()

# plot metrics 2
plt.subplot(1, 2, 2)
plt.plot(val_prec_list, label=’Validation Precision‘)
plt.plot(val_recall_list, label=’Validation Recall‘)
plt.xlabel(‚Epochs‘)
plt.ylabel(‚Score‘)
plt.legend()
plt.suptitle(„(learning_rate={}, batch_size={}, audio_length={}, test_acc={}, test_prec={}, test_recall={})“.format(
learning_rate, batch_size, audio_length, final_test_acc, final_test_prec, final_test_recall))

plt.show()

if __name__ == „__main__“:
main()

dataprocessing

import numpy as np
import librosa
import os

parent_dir = “
output_dir = “

def process_files(directory, label, target_length=196600):
stft_list = []
labels = []
file_counter = 0
npz_file_counter = 0
files = os.listdir(directory)

for i, filename in enumerate(files):
try:
file_path = os.path.join(directory, filename)

print(f“Processing file: {filename}“)

y, sr = librosa.load(file_path, sr=48000, dtype=np.float32)

for start in range(0, len(y), target_length):
y_chunk = y[start:start + target_length]

# skip, if y_chunk shorter than target_length
if len(y_chunk) < target_length:
continue

stft = np.abs(librosa.stft(y_chunk)).astype(np.float32)

stft = stft[470:854] # cuts spectrogramm
print(np.shape(stft))

print(stft.dtype)
stft_list.append(stft.tolist())
labels.append(label)

npz_file_counter += 1
npz_filename = os.path.join(output_dir, f'{label}_stft_data_{npz_file_counter}.npz‘)
print(f“Saving {npz_filename}“)
print(np.shape(stft))
if stft_list: # saves, if list is empty
np.savez(npz_filename, stft=np.array(stft_list, dtype=np.float32), labels=np.array(labels))
stft_list = []
labels = []

file_counter += 1

except Exception as e:
print(f“Error processing {filename}: {e}“)

for subfolder in os.listdir(parent_dir):
subfolder_path = os.path.join(parent_dir, subfolder)
if os.path.isdir(subfolder_path):
process_files(subfolder_path, subfolder)

mixup technique

import numpy as np
import os
import random

def mix_spectrograms(Leakfolder, Backgroundfolder, output_folder, label, max_files=2000, start_index=0):
spec_files1 = [f for f in os.listdir(Leakfolder) if f.endswith(‚.npz‘)]
spec_files2 = [f for f in os.listdir(Backgroundfolder) if f.endswith(‚.npz‘)]

min_files = min(len(spec_files1), len(spec_files2))
if max_files is not None:
min_files = min(min_files, max_files)

random.shuffle(spec_files1)
random.shuffle(spec_files2)

for i in range(min_files):
spec1 = np.load(os.path.join(Leakfolder, spec_files1[i]))[’stft‘]
spec2 = np.load(os.path.join(Backgroundfolder, spec_files2[i]))[’stft‘]

lam = 0.5 # Mixup-Coeff

mixed_spec = lam * spec1 + (1 – lam) * spec2

# Convert in float32
mixed_spec_float32 = mixed_spec.astype(np.float32)

output_filename = f“mixed_spec_{start_index + i}.npz“
np.savez(os.path.join(output_folder, output_filename), stft=np.expand_dims(mixed_spec_float32, 0), labels=np.array([label]))

# paths
mix_spectrograms(‚leakfiledirectory‘,
‚backgroundfiledirectory‘,
‚outputdirectory‘, ‚label(JA/NEIN‘)

rebuild_johnsonetal

import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import torchmetrics
from torchsummary import summary
from sklearn.metrics import precision_score, recall_score
import random
os.environ[‚CUDA_LAUNCH_BLOCKING‘] = ‚1‘

# Paths
train_folder_path = ‚trainpath‘
val_folder_path = ‚valpath‘
test_folder_path = ‚testpath‘

num_epochs = 50
learning_rate = 0.00001
batch_size = 32
audio_length = ‚for_description‘

class CustomDataset(Dataset):
def __init__(self, file_list):
self.file_list = file_list

def __len__(self):
return len(self.file_list)

def __getitem__(self, idx):
with np.load(self.file_list[idx]) as data_file:
stft = data_file[’stft‘]
lbls = 1 if data_file[‚labels‘] == ‚JA‘ else 0
if stft.size == 0 or np.isnan(stft).any():
print(f“Invalid data at Index {idx}, try again“)
return self.__getitem__(np.random.randint(len(self.file_list)))
stft = (stft – np.mean(stft)) / (np.std(stft) + 1e-10) #Z-score Normalisierung
return torch.tensor(stft, dtype=torch.float32), torch.tensor(lbls, dtype=torch.float32)

def main():
train_files = [os.path.join(train_folder_path, file) for file in os.listdir(train_folder_path) if file.endswith(„.npz“)]
val_files = [os.path.join(val_folder_path, file) for file in os.listdir(val_folder_path) if file.endswith(„.npz“)]
test_files = [os.path.join(test_folder_path, file) for file in os.listdir(test_folder_path) if
file.endswith(„.npz“)]

# Create Datasets
train_data = CustomDataset(train_files)
random.shuffle(val_files)
random.shuffle(test_files)
val_data = CustomDataset(val_files)
test_data = CustomDataset(test_files)

# Create DataLoader
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=44)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, num_workers = 44)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=44)

class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 6, kernel_size=3, padding=1)
self.pool = nn.MaxPool2d(2, 2)
self.dropout1 = nn.Dropout(0.3)

self.conv2 = nn.Conv2d(6, 12, kernel_size=3, padding=1)
self.conv3 = nn.Conv2d(12, 24, kernel_size=3, padding=1)

self.fc1 = nn.Linear(24 * 128 * 3, 96)
self.dropout2 = nn.Dropout(0.3)

self.fc2 = nn.Linear(96, 1)

def forward(self, x):
x = x + torch.randn_like(x) * 0.5 #Gaussian Noise
x = self.pool(F.relu(self.conv1(x)))
x = self.dropout1(x)

x = self.pool(F.relu(self.conv2(x)))
x = self.dropout1(x)

x = self.pool(F.relu(self.conv3(x)))

x = x.view(-1, self.num_flat_features(x))
x = F.relu(self.fc1(x))
x = self.dropout2(x)

x = self.fc2(x)
return torch.sigmoid(x)

def num_flat_features(self, x):
size = x.size()[1:] # all dimensions except the batch dimension
num_features = 1
for s in size:
num_features *= s
return num_features

# Create Model
model = Net()

# Train GPU
device = torch.device(‚cuda‘)
model = model.to(device)

# Summary Model
summary(model, (1, 1025, 24))

# Optimizer and Loss
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCELoss() #binary cross entropie loss

# Lists for Metrics
train_acc_list = []
val_acc_list = []
val_prec_list = []
val_recall_list = []
train_loss_list = []
val_loss_list = []

# Training
for epoch in range(num_epochs):
model.train()
test_loss = 0.0
correct = 0
total = 0
running_loss = 0.0

# Define Metrics
accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
precision = torchmetrics.Precision(‚binary‘).to(device)
recall = torchmetrics.Recall(‚binary‘).to(device)

for i, data in enumerate(train_loader, 0):
inputs, labels = data[0].float().to(device), data[1].float().to(device)
optimizer.zero_grad()

outputs = model(inputs).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()

loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

for name, param in model.named_parameters():
if param.requires_grad:
grad_mean = param.grad.mean() if param.grad is not None else ‚No gradient‘
# print(f“Layer: {name}, Weight: {param.data.mean()}, Gradient: {grad_mean}“)

running_loss += loss.item()

# Calculate Metrics
train_acc = accuracy(predicted, labels)
train_prec = precision(predicted, labels)
train_recall = recall(predicted, labels)

# Print Progress
print(
f’Epoch [{epoch + 1}/{num_epochs}], Batch [{i + 1}/{len(train_loader)}], Train Loss: {loss.item()}, Train Accuracy: {train_acc}, Train Precision: {train_prec}, Train Recall: {train_recall}‘)

# Reset Validationsmetric after every epoch
val_accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
val_precision = torchmetrics.Precision(‚binary‘).to(device)
val_recall = torchmetrics.Recall(‚binary‘).to(device)
val_running_loss = 0.0

# Validation
with torch.no_grad():
for data in val_loader:
images, labels = data[0].float().to(device), data[1].float().to(device)
outputs = model(images).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()

loss = criterion(outputs, labels)
val_running_loss += loss.item()

# update val-metrics
val_accuracy(predicted, labels)
val_precision(predicted, labels)
val_recall(predicted, labels)

# Calculate final val-metrics
final_val_acc = val_accuracy.compute()
final_val_prec = val_precision.compute()
final_val_recall = val_recall.compute()

# Saves metrics after every epoch
train_acc_list.append(train_acc.item())
val_acc_list.append(final_val_acc.item())
val_prec_list.append(final_val_prec.item())
val_recall_list.append(final_val_recall.item())
train_loss_list.append(running_loss / (i + 1))
val_loss_list.append(val_running_loss / len(val_loader))

print(
f’Epoch {epoch + 1} val_loss: {val_running_loss / len(val_loader)}, val_acc: {final_val_acc}, val_prec: {final_val_prec}, val_recall: {final_val_recall}‘)

# testloop
with torch.no_grad():
# initalisation metrics
test_accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
test_precision = torchmetrics.Precision(‚binary‘).to(device)
test_recall = torchmetrics.Recall(‚binary‘).to(device)

for data in test_loader:
images, labels = data[0].float().to(device), data[1].float().to(device)
outputs = model(images).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()

# loss
loss = criterion(outputs, labels)
test_loss += loss.item()

# update metrics
test_accuracy(predicted, labels)
test_precision(predicted, labels)
test_recall(predicted, labels)

# calculate final metrics
final_test_acc = test_accuracy.compute()
final_test_prec = test_precision.compute()
final_test_recall = test_recall.compute()

print(
f“\nTest Accuracy: {final_test_acc}, Test Precision: {final_test_prec}, Test Recall: {final_test_recall}“)

# plot metrics
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_acc_list, label=’Training Accuracy‘)
plt.plot(val_acc_list, label=’Validation Accuracy‘)
plt.plot(train_loss_list, label=’Training Loss‘, linestyle=‘–‚)
plt.plot(val_loss_list, label=’Validation Loss‘, linestyle=‘–‚)
plt.xlabel(‚Epochs‘)
plt.ylabel(‚Score‘)
plt.ylim(0, 1)
plt.legend()

# plot metrics 2
plt.subplot(1, 2, 2)
plt.plot(val_prec_list, label=’Validation Precision‘)
plt.plot(val_recall_list, label=’Validation Recall‘)
plt.xlabel(‚Epochs‘)
plt.ylabel(‚Score‘)
plt.legend()
plt.suptitle(„(learning_rate={}, batch_size={}, audio_length={}, test_acc={}, test_prec={}, test_recall={})“.format(
learning_rate, batch_size, audio_length, final_test_acc, final_test_prec, final_test_recall))

plt.show()

if __name__ == „__main__“:
main()

Weitere Projekte

CSE Institute: SmOP Project

SmOP – Smart Overpressure Protection Device

Intelligente Absicherung chemischer Reaktoren mit adaptiven Sicherheitseinrichtungen.

CSE Institute: RiIM Project

RiIM – Risk-based Pipeline Integrity Management

Entwicklung eines KI-gestützten, risikobasierten Pipeline-Integritätsmanagementsystems

CSE institute: EuroValve Project

EuroValve – European Program on Evaluation of Safety Valve Stability

Entwicklung eines neuen Sicherheitskriteriums für Sicherheitsventile.

Quelle des Titelbilds "Tube Geometry", Projekt RiIM: Harald Hoyer from Schwerin, Germany, CC BY-SA 2.0, via Wikimedia Commons

Quelle des Titelbilds "Winstainforth", Projekt SafeDDT, CC BY-SA 3.0, via Wikimedia Commons