Readme
This repository accompanies the publication titled „Enhancing acoustic leak detection with Data augmentation: overcoming background noise challenges“. It contains the source code and resources outlined below:
- STFT Data Processing: Code to process audio inputs into 384×384 labeled STFT spectrograms.
- Audio Augmentation: Includes methods for the linear combination of background noise with leak sounds.
- CNNs for Classification:
- Johnson et al. (2020) Network: Rebuild of the model from Johnson et al. (2020)
- (A)-AlexNet: Modified version tailored for (384,384) input
- (A)-VGG16: Modified version tailored for (384,384) input
- (A)-ResNet18: Modified version tailored for (384,384) input
Each section of the repository is designed to support different stages of the audio processing and machine learning workflow detailed in our publication.
For further details, please refer to the individual directories and documentation within the repository.
(A)-AlexNet
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import torchmetrics
from torchsummary import summary
from sklearn.metrics import precision_score, recall_score
import random
os.environ[‚CUDA_LAUNCH_BLOCKING‘] = ‚1‘
# Paths
train_folder_path = ‚trainpath‘
val_folder_path = ‚valpath‘
test_folder_path = ‚testpath‘
num_epochs = 50
learning_rate = 0.00001
batch_size = 32
audio_length = ‚for_description‘
class CustomDataset(Dataset):
def __init__(self, file_list):
self.file_list = file_list
def __len__(self):
return len(self.file_list)
def __getitem__(self, idx):
with np.load(self.file_list[idx]) as data_file:
stft = data_file[’stft‘]
lbls = 1 if data_file[‚labels‘] == ‚JA‘ else 0
if stft.size == 0 or np.isnan(stft).any():
print(f“Invalid Data at Index {idx}, try another“)
return self.__getitem__(np.random.randint(len(self.file_list)))
stft = (stft – np.mean(stft)) / (np.std(stft) + 1e-10) #Z-score Normalisierung
return torch.tensor(stft, dtype=torch.float32), torch.tensor(lbls, dtype=torch.float32)
def main():
train_files = [os.path.join(train_folder_path, file) for file in os.listdir(train_folder_path) if file.endswith(„.npz“)]
val_files = [os.path.join(val_folder_path, file) for file in os.listdir(val_folder_path) if file.endswith(„.npz“)]
test_files = [os.path.join(test_folder_path, file) for file in os.listdir(test_folder_path) if
file.endswith(„.npz“)]
# Create Datasets
train_data = CustomDataset(train_files)
random.shuffle(val_files)
random.shuffle(test_files)
val_data = CustomDataset(val_files)
test_data = CustomDataset(test_files)
# Create DataLoader
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=44)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, num_workers = 44)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=44)
class Net(nn.Module):
def __init__(self): #AlexNET
super(Net, self).__init__()
# Feature extraction layers
self.features = nn.Sequential(
nn.Conv2d(1, 64, kernel_size=11, stride=4, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Conv2d(64, 192, kernel_size=5, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Conv2d(192, 384, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(384, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2),
)
# Fully connected layers
self.classifier = nn.Sequential(
nn.Dropout(),
nn.Linear(256 * 11 * 11, 4096),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Linear(4096, 4096),
nn.ReLU(inplace=True),
nn.Linear(4096, 1),
nn.Sigmoid() # Sigmoid activation for binary classification
)
def forward(self, x):
x = self.features(x)
x = x.view(x.size(0), 256 * 11 * 11)
x = self.classifier(x)
return x
# Create Model
model = Net()
# Train GPU
device = torch.device(‚cuda‘)
model = model.to(device)
# Summary Model
summary(model, (1, 1025, 24))
# Optimizer and Loss
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCELoss() #binary cross entropie loss
# Lists for Metrics
train_acc_list = []
val_acc_list = []
val_prec_list = []
val_recall_list = []
train_loss_list = []
val_loss_list = []
# Training
for epoch in range(num_epochs):
model.train()
test_loss = 0.0
correct = 0
total = 0
running_loss = 0.0
# Define Metrics
accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
precision = torchmetrics.Precision(‚binary‘).to(device)
recall = torchmetrics.Recall(‚binary‘).to(device)
for i, data in enumerate(train_loader, 0):
inputs, labels = data[0].float().to(device), data[1].float().to(device)
optimizer.zero_grad()
outputs = model(inputs).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
for name, param in model.named_parameters():
if param.requires_grad:
grad_mean = param.grad.mean() if param.grad is not None else ‚No gradient‘
# print(f“Layer: {name}, Weight: {param.data.mean()}, Gradient: {grad_mean}“)
running_loss += loss.item()
# Calculate Metrics
train_acc = accuracy(predicted, labels)
train_prec = precision(predicted, labels)
train_recall = recall(predicted, labels)
# Print Progress
print(
f’Epoch [{epoch + 1}/{num_epochs}], Batch [{i + 1}/{len(train_loader)}], Train Loss: {loss.item()}, Train Accuracy: {train_acc}, Train Precision: {train_prec}, Train Recall: {train_recall}‘)
# Reset Validationsmetric after every epoch
val_accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
val_precision = torchmetrics.Precision(‚binary‘).to(device)
val_recall = torchmetrics.Recall(‚binary‘).to(device)
val_running_loss = 0.0
# Validation
with torch.no_grad():
for data in val_loader:
images, labels = data[0].float().to(device), data[1].float().to(device)
outputs = model(images).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()
loss = criterion(outputs, labels)
val_running_loss += loss.item()
# update val-metrics
val_accuracy(predicted, labels)
val_precision(predicted, labels)
val_recall(predicted, labels)
# Calculate final val-metrics
final_val_acc = val_accuracy.compute()
final_val_prec = val_precision.compute()
final_val_recall = val_recall.compute()
# Saves metrics after every epoch
train_acc_list.append(train_acc.item())
val_acc_list.append(final_val_acc.item())
val_prec_list.append(final_val_prec.item())
val_recall_list.append(final_val_recall.item())
train_loss_list.append(running_loss / (i + 1))
val_loss_list.append(val_running_loss / len(val_loader))
print(
f’Epoch {epoch + 1} val_loss: {val_running_loss / len(val_loader)}, val_acc: {final_val_acc}, val_prec: {final_val_prec}, val_recall: {final_val_recall}‘)
# testloop
with torch.no_grad():
# initalisation metrics
test_accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
test_precision = torchmetrics.Precision(‚binary‘).to(device)
test_recall = torchmetrics.Recall(‚binary‘).to(device)
for data in test_loader:
images, labels = data[0].float().to(device), data[1].float().to(device)
outputs = model(images).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()
# loss
loss = criterion(outputs, labels)
test_loss += loss.item()
# update metrics
test_accuracy(predicted, labels)
test_precision(predicted, labels)
test_recall(predicted, labels)
# calculate final metrics
final_test_acc = test_accuracy.compute()
final_test_prec = test_precision.compute()
final_test_recall = test_recall.compute()
print(
f“\\nTest Accuracy: {final_test_acc}, Test Precision: {final_test_prec}, Test Recall: {final_test_recall}“)
# plot metrics
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_acc_list, label=’Training Accuracy‘)
plt.plot(val_acc_list, label=’Validation Accuracy‘)
plt.plot(train_loss_list, label=’Training Loss‘, linestyle=‘–‚)
plt.plot(val_loss_list, label=’Validation Loss‘, linestyle=‘–‚)
plt.xlabel(‚Epochs‘)
plt.ylabel(‚Score‘)
plt.ylim(0, 1)
plt.legend()
# plot metrics 2
plt.subplot(1, 2, 2)
plt.plot(val_prec_list, label=’Validation Precision‘)
plt.plot(val_recall_list, label=’Validation Recall‘)
plt.xlabel(‚Epochs‘)
plt.ylabel(‚Score‘)
plt.legend()
plt.suptitle(„(learning_rate={}, batch_size={}, audio_length={}, test_acc={}, test_prec={}, test_recall={})“.format(
learning_rate, batch_size, audio_length, final_test_acc, final_test_prec, final_test_recall))
plt.show()
if __name__ == „__main__“:
main()
(A)-RestNet18
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import torchmetrics
from torchsummary import summary
from sklearn.metrics import precision_score, recall_score
import random
os.environ[‚CUDA_LAUNCH_BLOCKING‘] = ‚1‘
# Paths
train_folder_path = ‚trainpath‘
val_folder_path = ‚valpath‘
test_folder_path = ‚testpath‘
num_epochs = 50
learning_rate = 0.00001
batch_size = 32
audio_length = ‚for_description‘
class CustomDataset(Dataset):
def __init__(self, file_list):
self.file_list = file_list
def __len__(self):
return len(self.file_list)
def __getitem__(self, idx):
with np.load(self.file_list[idx]) as data_file:
stft = data_file[’stft‘]
lbls = 1 if data_file[‚labels‘] == ‚JA‘ else 0
if stft.size == 0 or np.isnan(stft).any():
print(f“Invalid Data at index {idx}, try again“)
return self.__getitem__(np.random.randint(len(self.file_list)))
stft = (stft – np.mean(stft)) / (np.std(stft) + 1e-10) #Z-score Normalisierung
return torch.tensor(stft, dtype=torch.float32), torch.tensor(lbls, dtype=torch.float32)
def main():
train_files = [os.path.join(train_folder_path, file) for file in os.listdir(train_folder_path) if file.endswith(„.npz“)]
val_files = [os.path.join(val_folder_path, file) for file in os.listdir(val_folder_path) if file.endswith(„.npz“)]
test_files = [os.path.join(test_folder_path, file) for file in os.listdir(test_folder_path) if
file.endswith(„.npz“)]
# Create Datasets
train_data = CustomDataset(train_files)
random.shuffle(val_files)
random.shuffle(test_files)
val_data = CustomDataset(val_files)
test_data = CustomDataset(test_files)
# Create DataLoader
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=44)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, num_workers = 44)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=44)
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, in_planes, planes, stride=1):
super(BasicBlock, self).__init__()
self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes)
self.shortcut = nn.Sequential()
if stride != 1 or in_planes != self.expansion * planes:
self.shortcut = nn.Sequential(
nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(self.expansion * planes)
)
def forward(self, x):
out = torch.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += self.shortcut(x)
out = torch.relu(out)
return out
class CustomResNet18(nn.Module):
def __init__(self, block, num_blocks, num_classes=1):
super(CustomResNet18, self).__init__()
self.in_planes = 64
self.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(64)
self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
self.linear = nn.Linear(73728, num_classes)
self.sigmoid = nn.Sigmoid()
def _make_layer(self, block, planes, num_blocks, stride):
strides = [stride] + [1] * (num_blocks – 1)
layers = []
for stride in strides:
layers.append(block(self.in_planes, planes, stride))
self.in_planes = planes * block.expansion
return nn.Sequential(*layers)
def forward(self, x):
out = torch.relu(self.bn1(self.conv1(x)))
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
out = self.layer4(out)
out = F.avg_pool2d(out, 4)
out = out.view(out.size(0), -1)
out = self.linear(out)
out = self.sigmoid(out)
return out
def custom_resnet18():
return CustomResNet18(BasicBlock, [2, 2, 2, 2])
# Create Model
model = custom_resnet18()
# Train GPU
device = torch.device(‚cuda‘)
model = model.to(device)
# Summary Model
summary(model, (1, 1025, 24))
# Optimizer and Loss
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCELoss() #binary cross entropie loss
# Lists for Metrics
train_acc_list = []
val_acc_list = []
val_prec_list = []
val_recall_list = []
train_loss_list = []
val_loss_list = []
# Training
for epoch in range(num_epochs):
model.train()
test_loss = 0.0
correct = 0
total = 0
running_loss = 0.0
# Define Metrics
accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
precision = torchmetrics.Precision(‚binary‘).to(device)
recall = torchmetrics.Recall(‚binary‘).to(device)
for i, data in enumerate(train_loader, 0):
inputs, labels = data[0].float().to(device), data[1].float().to(device)
optimizer.zero_grad()
outputs = model(inputs).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
for name, param in model.named_parameters():
if param.requires_grad:
grad_mean = param.grad.mean() if param.grad is not None else ‚No gradient‘
# print(f“Layer: {name}, Weight: {param.data.mean()}, Gradient: {grad_mean}“)
running_loss += loss.item()
# Calculate Metrics
train_acc = accuracy(predicted, labels)
train_prec = precision(predicted, labels)
train_recall = recall(predicted, labels)
# Print Progress
print(
f’Epoch [{epoch + 1}/{num_epochs}], Batch [{i + 1}/{len(train_loader)}], Train Loss: {loss.item()}, Train Accuracy: {train_acc}, Train Precision: {train_prec}, Train Recall: {train_recall}‘)
# Reset Validationsmetric after every epoch
val_accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
val_precision = torchmetrics.Precision(‚binary‘).to(device)
val_recall = torchmetrics.Recall(‚binary‘).to(device)
val_running_loss = 0.0
# Validation
with torch.no_grad():
for data in val_loader:
images, labels = data[0].float().to(device), data[1].float().to(device)
outputs = model(images).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()
loss = criterion(outputs, labels)
val_running_loss += loss.item()
# val-metric update
val_accuracy(predicted, labels)
val_precision(predicted, labels)
val_recall(predicted, labels)
# Calculate final val-metrics
final_val_acc = val_accuracy.compute()
final_val_prec = val_precision.compute()
final_val_recall = val_recall.compute()
# Saves metrics after every epoch
train_acc_list.append(train_acc.item())
val_acc_list.append(final_val_acc.item())
val_prec_list.append(final_val_prec.item())
val_recall_list.append(final_val_recall.item())
train_loss_list.append(running_loss / (i + 1))
val_loss_list.append(val_running_loss / len(val_loader))
print(
f’Epoch {epoch + 1} val_loss: {val_running_loss / len(val_loader)}, val_acc: {final_val_acc}, val_prec: {final_val_prec}, val_recall: {final_val_recall}‘)
# testloop
with torch.no_grad():
# initalisation metrics
test_accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
test_precision = torchmetrics.Precision(‚binary‘).to(device)
test_recall = torchmetrics.Recall(‚binary‘).to(device)
for data in test_loader:
images, labels = data[0].float().to(device), data[1].float().to(device)
outputs = model(images).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()
# loss
loss = criterion(outputs, labels)
test_loss += loss.item()
# update metrics
test_accuracy(predicted, labels)
test_precision(predicted, labels)
test_recall(predicted, labels)
# calculate final metrics
final_test_acc = test_accuracy.compute()
final_test_prec = test_precision.compute()
final_test_recall = test_recall.compute()
print(
f“\nTest Accuracy: {final_test_acc}, Test Precision: {final_test_prec}, Test Recall: {final_test_recall}“)
# plot metrics
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_acc_list, label=’Training Accuracy‘)
plt.plot(val_acc_list, label=’Validation Accuracy‘)
plt.plot(train_loss_list, label=’Training Loss‘, linestyle=‘–‚)
plt.plot(val_loss_list, label=’Validation Loss‘, linestyle=‘–‚)
plt.xlabel(‚Epochs‘)
plt.ylabel(‚Score‘)
plt.ylim(0, 1)
plt.legend()
# plot metrics 2
plt.subplot(1, 2, 2)
plt.plot(val_prec_list, label=’Validation Precision‘)
plt.plot(val_recall_list, label=’Validation Recall‘)
plt.xlabel(‚Epochs‘)
plt.ylabel(‚Score‘)
plt.legend()
plt.suptitle(„(learning_rate={}, batch_size={}, audio_length={}, test_acc={}, test_prec={}, test_recall={})“.format(
learning_rate, batch_size, audio_length, final_test_acc, final_test_prec, final_test_recall))
plt.show()
if __name__ == „__main__“:
main()
(A)-VGG16
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import torchmetrics
from torchsummary import summary
from sklearn.metrics import precision_score, recall_score
import random
os.environ[‚CUDA_LAUNCH_BLOCKING‘] = ‚1‘
# Paths
train_folder_path = ‚trainpath‘
val_folder_path = ‚valpath‘
test_folder_path = ‚testpath‘
num_epochs = 50
learning_rate = 0.00001
batch_size = 32
audio_length = ‚for_description‘
class CustomDataset(Dataset):
def __init__(self, file_list):
self.file_list = file_list
def __len__(self):
return len(self.file_list)
def __getitem__(self, idx):
with np.load(self.file_list[idx]) as data_file:
stft = data_file[’stft‘]
lbls = 1 if data_file[‚labels‘] == ‚JA‘ else 0
if stft.size == 0 or np.isnan(stft).any():
print(f“Invalid Data at Index {idx}, try again“)
return self.__getitem__(np.random.randint(len(self.file_list)))
stft = (stft – np.mean(stft)) / (np.std(stft) + 1e-10) #Z-score Normalisierung
return torch.tensor(stft, dtype=torch.float32), torch.tensor(lbls, dtype=torch.float32)
def main():
train_files = [os.path.join(train_folder_path, file) for file in os.listdir(train_folder_path) if file.endswith(„.npz“)]
val_files = [os.path.join(val_folder_path, file) for file in os.listdir(val_folder_path) if file.endswith(„.npz“)]
test_files = [os.path.join(test_folder_path, file) for file in os.listdir(test_folder_path) if
file.endswith(„.npz“)]
# Create Datasets
train_data = CustomDataset(train_files)
random.shuffle(val_files)
random.shuffle(test_files)
val_data = CustomDataset(val_files)
test_data = CustomDataset(test_files)
# Create DataLoader
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=44)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, num_workers = 44)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=44)
class Net(nn.Module):
def __init__(self): #VGG16
super(Net, self).__init__()
# Feature extraction layers
self.features = nn.Sequential(
# Block 1
nn.Conv2d(1, 64, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(64, 64, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
# Block 2
nn.Conv2d(64, 128, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(128, 128, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
# Block 3
nn.Conv2d(128, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(256, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
# Block 4
nn.Conv2d(256, 512, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
# Block 5
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(512, 512, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
)
self.classifier = nn.Sequential(
nn.Linear(512 * 12 * 12, 4096),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Linear(4096, 4096),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Linear(4096, 1),
nn.Sigmoid(),
)
def forward(self, x):
x = self.features(x)
x = x.view(x.size(0), -1)
x = self.classifier(x)
return x
# Create Model
model = Net()
# Train GPU
device = torch.device(‚cuda‘)
model = model.to(device)
# Summary Model
summary(model, (1, 1025, 24))
# Optimizer and Loss
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCELoss() #binary cross entropie loss
# Lists for Metrics
train_acc_list = []
val_acc_list = []
val_prec_list = []
val_recall_list = []
train_loss_list = []
val_loss_list = []
# Training
for epoch in range(num_epochs):
model.train()
test_loss = 0.0
correct = 0
total = 0
running_loss = 0.0
# Define Metrics
accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
precision = torchmetrics.Precision(‚binary‘).to(device)
recall = torchmetrics.Recall(‚binary‘).to(device)
for i, data in enumerate(train_loader, 0):
inputs, labels = data[0].float().to(device), data[1].float().to(device)
optimizer.zero_grad()
outputs = model(inputs).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
for name, param in model.named_parameters():
if param.requires_grad:
grad_mean = param.grad.mean() if param.grad is not None else ‚No gradient‘
# print(f“Layer: {name}, Weight: {param.data.mean()}, Gradient: {grad_mean}“)
running_loss += loss.item()
# Calculate Metrics
train_acc = accuracy(predicted, labels)
train_prec = precision(predicted, labels)
train_recall = recall(predicted, labels)
# Print Progress
print(
f’Epoch [{epoch + 1}/{num_epochs}], Batch [{i + 1}/{len(train_loader)}], Train Loss: {loss.item()}, Train Accuracy: {train_acc}, Train Precision: {train_prec}, Train Recall: {train_recall}‘)
# Reset Validationsmetric after every epoch
val_accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
val_precision = torchmetrics.Precision(‚binary‘).to(device)
val_recall = torchmetrics.Recall(‚binary‘).to(device)
val_running_loss = 0.0
# Validation
with torch.no_grad():
for data in val_loader:
images, labels = data[0].float().to(device), data[1].float().to(device)
outputs = model(images).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()
loss = criterion(outputs, labels)
val_running_loss += loss.item()
# update val-metrics
val_accuracy(predicted, labels)
val_precision(predicted, labels)
val_recall(predicted, labels)
# Calculate final val-metrics
final_val_acc = val_accuracy.compute()
final_val_prec = val_precision.compute()
final_val_recall = val_recall.compute()
# Saves metrics after every epoch
train_acc_list.append(train_acc.item())
val_acc_list.append(final_val_acc.item())
val_prec_list.append(final_val_prec.item())
val_recall_list.append(final_val_recall.item())
train_loss_list.append(running_loss / (i + 1))
val_loss_list.append(val_running_loss / len(val_loader))
print(
f’Epoch {epoch + 1} val_loss: {val_running_loss / len(val_loader)}, val_acc: {final_val_acc}, val_prec: {final_val_prec}, val_recall: {final_val_recall}‘)
# testloop
with torch.no_grad():
# initalisation metrics
test_accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
test_precision = torchmetrics.Precision(‚binary‘).to(device)
test_recall = torchmetrics.Recall(‚binary‘).to(device)
for data in test_loader:
images, labels = data[0].float().to(device), data[1].float().to(device)
outputs = model(images).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()
# loss
loss = criterion(outputs, labels)
test_loss += loss.item()
# update metrics
test_accuracy(predicted, labels)
test_precision(predicted, labels)
test_recall(predicted, labels)
# calculate final metrics
final_test_acc = test_accuracy.compute()
final_test_prec = test_precision.compute()
final_test_recall = test_recall.compute()
print(
f“\nTest Accuracy: {final_test_acc}, Test Precision: {final_test_prec}, Test Recall: {final_test_recall}“)
# plot metrics
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_acc_list, label=’Training Accuracy‘)
plt.plot(val_acc_list, label=’Validation Accuracy‘)
plt.plot(train_loss_list, label=’Training Loss‘, linestyle=‘–‚)
plt.plot(val_loss_list, label=’Validation Loss‘, linestyle=‘–‚)
plt.xlabel(‚Epochs‘)
plt.ylabel(‚Score‘)
plt.ylim(0, 1)
plt.legend()
# plot metrics 2
plt.subplot(1, 2, 2)
plt.plot(val_prec_list, label=’Validation Precision‘)
plt.plot(val_recall_list, label=’Validation Recall‘)
plt.xlabel(‚Epochs‘)
plt.ylabel(‚Score‘)
plt.legend()
plt.suptitle(„(learning_rate={}, batch_size={}, audio_length={}, test_acc={}, test_prec={}, test_recall={})“.format(
learning_rate, batch_size, audio_length, final_test_acc, final_test_prec, final_test_recall))
plt.show()
if __name__ == „__main__“:
main()
dataprocessing
import numpy as np
import librosa
import os
parent_dir = “
output_dir = “
def process_files(directory, label, target_length=196600):
stft_list = []
labels = []
file_counter = 0
npz_file_counter = 0
files = os.listdir(directory)
for i, filename in enumerate(files):
try:
file_path = os.path.join(directory, filename)
print(f“Processing file: {filename}“)
y, sr = librosa.load(file_path, sr=48000, dtype=np.float32)
for start in range(0, len(y), target_length):
y_chunk = y[start:start + target_length]
# skip, if y_chunk shorter than target_length
if len(y_chunk) < target_length:
continue
stft = np.abs(librosa.stft(y_chunk)).astype(np.float32)
stft = stft[470:854] # cuts spectrogramm
print(np.shape(stft))
print(stft.dtype)
stft_list.append(stft.tolist())
labels.append(label)
npz_file_counter += 1
npz_filename = os.path.join(output_dir, f'{label}_stft_data_{npz_file_counter}.npz‘)
print(f“Saving {npz_filename}“)
print(np.shape(stft))
if stft_list: # saves, if list is empty
np.savez(npz_filename, stft=np.array(stft_list, dtype=np.float32), labels=np.array(labels))
stft_list = []
labels = []
file_counter += 1
except Exception as e:
print(f“Error processing {filename}: {e}“)
for subfolder in os.listdir(parent_dir):
subfolder_path = os.path.join(parent_dir, subfolder)
if os.path.isdir(subfolder_path):
process_files(subfolder_path, subfolder)
mixup technique
import numpy as np
import os
import random
def mix_spectrograms(Leakfolder, Backgroundfolder, output_folder, label, max_files=2000, start_index=0):
spec_files1 = [f for f in os.listdir(Leakfolder) if f.endswith(‚.npz‘)]
spec_files2 = [f for f in os.listdir(Backgroundfolder) if f.endswith(‚.npz‘)]
min_files = min(len(spec_files1), len(spec_files2))
if max_files is not None:
min_files = min(min_files, max_files)
random.shuffle(spec_files1)
random.shuffle(spec_files2)
for i in range(min_files):
spec1 = np.load(os.path.join(Leakfolder, spec_files1[i]))[’stft‘]
spec2 = np.load(os.path.join(Backgroundfolder, spec_files2[i]))[’stft‘]
lam = 0.5 # Mixup-Coeff
mixed_spec = lam * spec1 + (1 – lam) * spec2
# Convert in float32
mixed_spec_float32 = mixed_spec.astype(np.float32)
output_filename = f“mixed_spec_{start_index + i}.npz“
np.savez(os.path.join(output_folder, output_filename), stft=np.expand_dims(mixed_spec_float32, 0), labels=np.array([label]))
# paths
mix_spectrograms(‚leakfiledirectory‘,
‚backgroundfiledirectory‘,
‚outputdirectory‘, ‚label(JA/NEIN‘)
rebuild_johnsonetal
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import torchmetrics
from torchsummary import summary
from sklearn.metrics import precision_score, recall_score
import random
os.environ[‚CUDA_LAUNCH_BLOCKING‘] = ‚1‘
# Paths
train_folder_path = ‚trainpath‘
val_folder_path = ‚valpath‘
test_folder_path = ‚testpath‘
num_epochs = 50
learning_rate = 0.00001
batch_size = 32
audio_length = ‚for_description‘
class CustomDataset(Dataset):
def __init__(self, file_list):
self.file_list = file_list
def __len__(self):
return len(self.file_list)
def __getitem__(self, idx):
with np.load(self.file_list[idx]) as data_file:
stft = data_file[’stft‘]
lbls = 1 if data_file[‚labels‘] == ‚JA‘ else 0
if stft.size == 0 or np.isnan(stft).any():
print(f“Invalid data at Index {idx}, try again“)
return self.__getitem__(np.random.randint(len(self.file_list)))
stft = (stft – np.mean(stft)) / (np.std(stft) + 1e-10) #Z-score Normalisierung
return torch.tensor(stft, dtype=torch.float32), torch.tensor(lbls, dtype=torch.float32)
def main():
train_files = [os.path.join(train_folder_path, file) for file in os.listdir(train_folder_path) if file.endswith(„.npz“)]
val_files = [os.path.join(val_folder_path, file) for file in os.listdir(val_folder_path) if file.endswith(„.npz“)]
test_files = [os.path.join(test_folder_path, file) for file in os.listdir(test_folder_path) if
file.endswith(„.npz“)]
# Create Datasets
train_data = CustomDataset(train_files)
random.shuffle(val_files)
random.shuffle(test_files)
val_data = CustomDataset(val_files)
test_data = CustomDataset(test_files)
# Create DataLoader
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=44)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, num_workers = 44)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=44)
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 6, kernel_size=3, padding=1)
self.pool = nn.MaxPool2d(2, 2)
self.dropout1 = nn.Dropout(0.3)
self.conv2 = nn.Conv2d(6, 12, kernel_size=3, padding=1)
self.conv3 = nn.Conv2d(12, 24, kernel_size=3, padding=1)
self.fc1 = nn.Linear(24 * 128 * 3, 96)
self.dropout2 = nn.Dropout(0.3)
self.fc2 = nn.Linear(96, 1)
def forward(self, x):
x = x + torch.randn_like(x) * 0.5 #Gaussian Noise
x = self.pool(F.relu(self.conv1(x)))
x = self.dropout1(x)
x = self.pool(F.relu(self.conv2(x)))
x = self.dropout1(x)
x = self.pool(F.relu(self.conv3(x)))
x = x.view(-1, self.num_flat_features(x))
x = F.relu(self.fc1(x))
x = self.dropout2(x)
x = self.fc2(x)
return torch.sigmoid(x)
def num_flat_features(self, x):
size = x.size()[1:] # all dimensions except the batch dimension
num_features = 1
for s in size:
num_features *= s
return num_features
# Create Model
model = Net()
# Train GPU
device = torch.device(‚cuda‘)
model = model.to(device)
# Summary Model
summary(model, (1, 1025, 24))
# Optimizer and Loss
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.BCELoss() #binary cross entropie loss
# Lists for Metrics
train_acc_list = []
val_acc_list = []
val_prec_list = []
val_recall_list = []
train_loss_list = []
val_loss_list = []
# Training
for epoch in range(num_epochs):
model.train()
test_loss = 0.0
correct = 0
total = 0
running_loss = 0.0
# Define Metrics
accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
precision = torchmetrics.Precision(‚binary‘).to(device)
recall = torchmetrics.Recall(‚binary‘).to(device)
for i, data in enumerate(train_loader, 0):
inputs, labels = data[0].float().to(device), data[1].float().to(device)
optimizer.zero_grad()
outputs = model(inputs).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
for name, param in model.named_parameters():
if param.requires_grad:
grad_mean = param.grad.mean() if param.grad is not None else ‚No gradient‘
# print(f“Layer: {name}, Weight: {param.data.mean()}, Gradient: {grad_mean}“)
running_loss += loss.item()
# Calculate Metrics
train_acc = accuracy(predicted, labels)
train_prec = precision(predicted, labels)
train_recall = recall(predicted, labels)
# Print Progress
print(
f’Epoch [{epoch + 1}/{num_epochs}], Batch [{i + 1}/{len(train_loader)}], Train Loss: {loss.item()}, Train Accuracy: {train_acc}, Train Precision: {train_prec}, Train Recall: {train_recall}‘)
# Reset Validationsmetric after every epoch
val_accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
val_precision = torchmetrics.Precision(‚binary‘).to(device)
val_recall = torchmetrics.Recall(‚binary‘).to(device)
val_running_loss = 0.0
# Validation
with torch.no_grad():
for data in val_loader:
images, labels = data[0].float().to(device), data[1].float().to(device)
outputs = model(images).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()
loss = criterion(outputs, labels)
val_running_loss += loss.item()
# update val-metrics
val_accuracy(predicted, labels)
val_precision(predicted, labels)
val_recall(predicted, labels)
# Calculate final val-metrics
final_val_acc = val_accuracy.compute()
final_val_prec = val_precision.compute()
final_val_recall = val_recall.compute()
# Saves metrics after every epoch
train_acc_list.append(train_acc.item())
val_acc_list.append(final_val_acc.item())
val_prec_list.append(final_val_prec.item())
val_recall_list.append(final_val_recall.item())
train_loss_list.append(running_loss / (i + 1))
val_loss_list.append(val_running_loss / len(val_loader))
print(
f’Epoch {epoch + 1} val_loss: {val_running_loss / len(val_loader)}, val_acc: {final_val_acc}, val_prec: {final_val_prec}, val_recall: {final_val_recall}‘)
# testloop
with torch.no_grad():
# initalisation metrics
test_accuracy = torchmetrics.Accuracy(‚binary‘).to(device)
test_precision = torchmetrics.Precision(‚binary‘).to(device)
test_recall = torchmetrics.Recall(‚binary‘).to(device)
for data in test_loader:
images, labels = data[0].float().to(device), data[1].float().to(device)
outputs = model(images).squeeze()
predicted = (outputs.data > 0.5).float().squeeze()
# loss
loss = criterion(outputs, labels)
test_loss += loss.item()
# update metrics
test_accuracy(predicted, labels)
test_precision(predicted, labels)
test_recall(predicted, labels)
# calculate final metrics
final_test_acc = test_accuracy.compute()
final_test_prec = test_precision.compute()
final_test_recall = test_recall.compute()
print(
f“\nTest Accuracy: {final_test_acc}, Test Precision: {final_test_prec}, Test Recall: {final_test_recall}“)
# plot metrics
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_acc_list, label=’Training Accuracy‘)
plt.plot(val_acc_list, label=’Validation Accuracy‘)
plt.plot(train_loss_list, label=’Training Loss‘, linestyle=‘–‚)
plt.plot(val_loss_list, label=’Validation Loss‘, linestyle=‘–‚)
plt.xlabel(‚Epochs‘)
plt.ylabel(‚Score‘)
plt.ylim(0, 1)
plt.legend()
# plot metrics 2
plt.subplot(1, 2, 2)
plt.plot(val_prec_list, label=’Validation Precision‘)
plt.plot(val_recall_list, label=’Validation Recall‘)
plt.xlabel(‚Epochs‘)
plt.ylabel(‚Score‘)
plt.legend()
plt.suptitle(„(learning_rate={}, batch_size={}, audio_length={}, test_acc={}, test_prec={}, test_recall={})“.format(
learning_rate, batch_size, audio_length, final_test_acc, final_test_prec, final_test_recall))
plt.show()
if __name__ == „__main__“:
main()
PROJEKTLEITUNG:
MITARBEITER | KONTAKT:
person Deniz Quick, M.Sc.
email E-Mail senden
phone +49 721 6699 4780
FORSCHUNGSSCHWERPUNKTE:
Weitere Projekte
SmOP – Smart Overpressure Protection Device
Intelligente Absicherung chemischer Reaktoren mit adaptiven Sicherheitseinrichtungen.
RiIM – Risk-based Pipeline Integrity Management
Entwicklung eines KI-gestützten, risikobasierten Pipeline-Integritätsmanagementsystems
EuroValve – European Program on Evaluation of Safety Valve Stability
Entwicklung eines neuen Sicherheitskriteriums für Sicherheitsventile.
Quelle des Titelbilds "Tube Geometry", Projekt RiIM: Harald Hoyer from Schwerin, Germany, CC BY-SA 2.0, via Wikimedia Commons
Quelle des Titelbilds "Winstainforth", Projekt SafeDDT, CC BY-SA 3.0, via Wikimedia Commons