이전 시간에는 MLP에 대해서 알아보았는데 이 모델의 문제는 입력의 크기가 고정되어있고, 순차 데이터를 처리할 수 없고 즉 이전의 입력을 기억하지 못한다는 문제가 있다.
이 문제를 해결하기 위해 RNN(Recurrent Neural Network)이 등장했다.
RNN으로 가변 길이의 시퀀스를 처리할 수 있게 되었고, 순차 데이터의 순서를 인식하고, 이전의 입력을 기억할 수 있게 되었다.
작동 원리의 핵심은 입력과 이전의 hidden state를 결합해서 출력을 생성한다는 것이다.
RNN 구조
이번 글에서는 RNN을 pytorch로 구현해보고, numpy로 구현해보고 끝내겠다.
import torch
import torch.nn as nn
import torch.optim as optim
input =torch.tensor([[[ 1 , 0 , 0 ],[ 1 , 1 , 1 ]],[[ 0 , 1 , 0 ],[ 1 , 1 , 1 ]],[[ 0 , 0 , 1 ],[ 1 , 1 , 1 ]]],dtype=torch.float32)
target = torch.tensor([[ 0 , 0 , 1 ],[ 1 , 0 , 0 ],[ 0 , 1 , 0 ]],dtype=torch.float32)
command = torch.tensor([ 1 , 1 , 1 ])
class RNN ( nn . Module ):
def __init__ ( self , input_size , hidden_size , output_size , num_layers = 1 ):
super(RNN, self ). __init__ ()
self .hidden_size = hidden_size
self .num_layers = num_layers
# RNN Layer
self .rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first= True ,nonlinearity= 'tanh' )
# Fully connected layer
self .fc = nn.Linear(hidden_size, output_size)
def forward ( self , x ):
# Initialize hidden state with zeros
h0 = torch.zeros( self .num_layers, x.size( 0 ), self .hidden_size).to(x.device)
# Forward propagate RNN
out, _ = self .rnn(x, h0)
# Decode the hidden state of the last time step
out = self .fc(out[:, -1 , :])
return out
model = RNN( 3 , 3 , 3 )
criterion = nn.CrossEntropyLoss() # For regression tasks
optimizer = optim.SGD(model.parameters(), lr= 0.01 )
num_epochs= 2000
for epoch in range (num_epochs):
model.train()
# Forward pass
outputs = model(input.reshape( 3 , 2 , 3 ))
loss = criterion(outputs, target)
# Backward pass and optimization
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (epoch+ 1 ) % 1000 == 0 :
print ( f 'Epoch [ {epoch+ 1 } / {num_epochs} ], Loss: {loss.item() :.4f } ' )
model. eval ()
with torch.no_grad():
print (model(torch.tensor([[ 0 , 1 , 0 ],[ 1 , 1 , 1 ]],dtype=torch.float32).reshape( 1 , 2 , 3 )))
print (model(torch.tensor([[ 0 , 0 , 1 ],[ 1 , 1 , 1 ]],dtype=torch.float32).reshape( 1 , 2 , 3 )))
print (model(torch.tensor([[ 1 , 0 , 0 ],[ 1 , 1 , 1 ]],dtype=torch.float32).reshape( 1 , 2 , 3 )))
결과
tensor([[ 3.8425, -1.9969, -2.5851]]) tensor([[-2.2396, 3.5427, -2.2349]]) tensor([[-2.2324, -2.3231, 3.5305]])
0,1,0을 넣고 1,1,1을 넣은 뒤 출력이 tensor([[ 3.8425, -1.9969, -2.5851]])
0,0,1을 넣고 1,1,1을 넣은 뒤 출력이 tensor([[-2.2396, 3.5427, -2.2349]])
1,0,0을 넣고 1,1,1을 넣은 뒤 출력이 tensor([[-2.2324, -2.3231, 3.5305]])
같은 1,1,1을 넣어줬지만 전 값이 무엇인지에 따라 다른 출력이 나오는걸 볼 수 있다. 이말은 즉 이전 상태를 기억하고 이를 이용해서 현재상태를 구한다는 말이다.
이제 이 코드를 기반으로 numpy만 써서 rnn을 구현해보겠다
import numpy as np
import random
initial_values = [np.array([ 0 , 0 , 0 ]),np.array([ 0 , 0 , 0 ]),np.array([ 0 , 0 , 0 ])]
inc = [np.array([ 1 , 0 , 0 ]),np.array([ 0 , 1 , 0 ]),np.array([ 0 , 0 , 1 ])]
mat = np.array([initial_values[i% 3 ]+inc[random.randint( 0 , 100 )% 3 ] for i in range ( 30 )])
mat
class RNN_BASE:
def __init__ ( self , input_size , hidden_size , seq_len , lr ):
#seq_len - backward때 몇개의 과거까지 train 할지
self .seq_len = seq_len
self .learning_rate = lr
self .h_size=hidden_size
self .U = np.random.randn(hidden_size,input_size) * 0.1
self .V = np.random.randn(hidden_size,hidden_size) * 0.1
self .W = np.random.randn(input_size, hidden_size) * 0.1
self .b0 = np.random.randn(hidden_size, 1 ) * 0.1
self .b1 = np.random.randn(input_size, 1 ) * 0.1
self .h0 = np.zeros((hidden_size, 1 ))
self .h1 = np.zeros(( self .h_size, 1 ))
self .X = []
def reset ( self ):
self .h0 = np.zeros(( self .h_size, 1 ))
self .h_state = []
self .X = []
self .h_state.append( self .h0)
def forward ( self , input ):
self .X.append(input)
if len ( self .X) > ( self .seq_len):
self .X.pop( 0 )
self .h1 = np.tanh( self .U @ input + self .V @ self .h_state[ -1 ] + self .b0)
self .y = self .W @ self .h1 + self .b1
ps = np.exp( self .y) / np. sum (np.exp( self .y)) # softmax계산
self .h_state.append( self .h1)
if len ( self .h_state) > ( self .seq_len+ 1 ):
self .h_state.pop( 0 )
return self .y
def backward ( self , target ):
db1 = ( self .y-target)
dW = db1 @ self .h_state[ -1 ].T #(self.y-target) @ self.h1.T
db0 = self .W.T @ db1 *( 1 - self .h_state[ -1 ]** 2 ) # self.W.T @ (self.y-target) *(1 - self.h1**2)
dV = db0 @ self .h_state[ -2 ].T # self.W.T @ (self.y-target) *(1 - self.h1**2) @self.h0.T
dU = db0 @ self .X[ -1 ].T # self.W.T @ (self.y-target) *(1 - self.h1**2) @ self.input.T
dh = self .V.T @ db0
for i in range ( len ( self .X) -1 ):
db0 += dh * ( 1 - self .h_state[ -2 -i]** 2 )
dV += dh * ( 1 - self .h_state[ -2 -i]** 2 ) @ self .h_state[ -3 -i].T
dU += dh * ( 1 - self .h_state[ -2 -i]** 2 ) @ self .X[ -2 -i].T
dh = self .V.T @ dh * ( 1 - self .h_state[ -2 -i]** 2 )
self .U -= self .learning_rate * dU
self .V -= self .learning_rate * dV
self .b0 -= self .learning_rate * db0
self .W -= self .learning_rate * dW
self .b1 -= self .learning_rate * db1
class RNN:
def __init__ ( self , input_size , hidden_size , seq_len , lr ):
#seq_len - backward때 몇개의 과거까지 train 할지
self .m=RNN_BASE(input_size, hidden_size , seq_len ,lr)
self .i= input_size
def train ( self , x , y ):
self .m.reset()
for i in range ( len (x)):
self .m.forward(x[i].reshape( self .i, 1 ))
self .m.backward(y[i].reshape( self .i, 1 ))
def predict ( self , x , c ):
out=[]
self .m.reset()
for i in range (c):
o= self .m.forward(x.reshape( self .i, 1 ))
idx=np.argmax(o)
out.append(idx)
x= np.zeros(( self .i, 1 ))
x[idx]= 1
return out
구현은 forward는 식대로 하고 backward는 chain rule으로 차근차근 하면 된다.
하지만 까다로운건 학습시킬때 현재 상태가 나오는게 이전 상태의 hidden state의 영향을 받기 때문에 이전 상태의 hidden state에 영향을 준 변수들도 고려해서 적용시켜야 한다는 것이다.. 상당히 까다롭고 뭉텅이를 넣어서 학습시킬 수도 있지만 나는 처음거 학습 시키고 그 다음거 학습시킬 때, seq_len-1 개수 만큼 과거로 돌아가서 그 요소까지 학습에 참여하도록 하였다.
m = RNN( 3 , 5 ,3 , 0.03 )
input =np.array([[ 1 , 0 , 0 ],[ 0 , 1 , 0 ],[ 0 , 0 , 1 ]])
target = np.array([[ 0 , 0 , 1 ],[ 1 , 0 , 0 ],[ 0 , 1 , 0 ]])
command = np.array([[ 1 , 1 , 1 ],[ 1 , 1 , 1 ],[ 1 , 1 , 1 ]]).reshape( 3 , 3 )
i= 3000
while True :
i-= 1
if i< 0 :
break
m.train(mat[: -16 ],mat[ 1 :])
print (m.predict(mat[ 0 ], 14 ))
이런식으로 랜덤하게 0~2까지 랜덤 순서로 학습을 시켜보았는데
mat의 인덱스 1번부터 20번까지가
[0, 2, 2, 0, 0, 1, 1, 1, 2, 1, 2, 1, 1, 0, 2, 0, 0, 0, 2, 1]
이거면
[0, 2, 2, 0, 0, 1, 1, 1, 2, 1, 2, 1, 1, 0]
14개 일때 이렇게 어느정도 맞게 나오는 경우도 있고 아닌 경우도 있다.
input 벡터가 3, hidden이 5일때 15개정도까지가 어느정도 학습이 되고 그 이상은 쉽지않다.
이러한 문제를 long term dependency 문제라고한다..
RNN을 구현할때 또 헷갈렸던 부분이 seq_len이라고 쓰인 몇개의 과거까지 update 할 것인가를 생각하는거였다.
파이토치에서 RNN을 구현하면 저 부분이 없기떄문에 더 헷갈렸다.
찾아본 결과 파이토치는 기본적으로 모든 과거를 update하는 것이었다.
모든 과거를 update하면 사실 너무 먼 과거는 그래디언트 소실로 거의 0에 가까운 것을 수정하는 것이기 떄문에
트렁케이션을 구현해서 효율을 높일 수 있다.
파이토치가 기울기를 구하는 방법은 관심 변수에 대해 계산이 일어날 때마다 그래프 형식으로 줄줄이 관심 변수에 대한 정보를 저장하는데,
너무 긴 seq에 대해 일정 길이마다 hidden = tuple ([h.detach() for h in hidden]) 이런식으로 detach해주면 이전 계산 그래프를 없애고 결과 값만 남겨 그 텐서를 새로운 그래프의 시작점으로 만들어 계산 정보를 다시 쌓는 방식으로 먼 과거에 대해 0에가까운 무의미한 기울기를 계산하는 걸 막을 수 있다.
게다가 메모리 효율도 향상된다