몬테카를로 방법(Montecarlo Method)

몬테카를로 방법(Monte Carlo method)은 어떤 문제에 대한 해를 무수히 많은 시도를 통해 얻어진 확률을 기반으로 하는 계산법입니다. 아래의 그림은 위키디피아의 몬테카를로에 대한 소개에 나온 이미지로써 원주율 π 값을 구하는 예입니다.

넓이가 1인 정사각형, 이 정사각형 내부에 반지름이 1인 사분원이 있습니다. 그러면 사분원이 차지하는 넓이는 π/4가 될 것이다. 이제 0 이상, 1 이하인 x와 y의 값을 무작위로 뽑은 후 x^2 + y^2 ≤ 1의 조건을 만족할 확률은 사분원의 넓이와 같은 π/4가 됩니다.

위의 논리를 코드로 작성하여 π를 구하면 다음과 같습니다.

import random

n = 1000000 # 백만번의 시도
count = 0

for i in range(n):
    # x, y를 무작위로 0~1사이의 값으로 결정
    x = random.uniform(0, 1)
    y = random.uniform(0, 1)

    # 사분원 내부에 발생하는 경우수 
    if (x**2 + y**2) <= 1: count += 1

# 백만번의 시도 중 사분원 내부일 경우에 대한 확률은 사분원의 넓이이므로 이를 4배 곱하여 π 계산
print('phi', 4*count/n)

위의 코드 중 4*count/n은 다음의 비례식을 통해 도출된 결과입니다.

전체확률 : 사분원 내부일 경우에 대한 확률 = 사각형의 넓이 : 사분원의 넓이

위의 비례식에 수치값을 대입하면 다음과 같습니다.

1 : count/n = 1 : π/4

몬테카를로 방법을 통해 실제와 가까운 해를 얻기 위해서는 방대한 단순 계산을 매우 빠르게 처리할 수 있는 컴퓨터가 필수입니다. 이 몬테카를로 방법은 핵폭탄이나 수소폭탄의 개발에서 핵심적인 역활을 담당했다고 합니다. 제 경우도 핵폭탄 개발이 필요해서... 가 아닌 강화학습(Reinforcement learning)의 한 방법으로 접하게 되었습니다.

신경망을 이용한 비선형 모델의 회귀분석

딥러닝을 위한 신경망은 기본적으로 선형회귀분석을 기반으로 합니다. 선형 회귀 분석이라는 전제 조건은 아주 복잡한 모델, 즉 비선형인 형태의 모델은 추론할 수 없지만, 신경망의 층(Layer)를 깊게 쌓으면서 그 중간에 비선형성을 부여하는 활성화 함수를 넣어주게 되면 선형회귀분석에 기반한 신경망으로도 아주 복잡한 비선형 모델도 추론할 수 있다는 것입니다. 예를 들어, 아래와 같은 분포를 가지는 데이터셋에 대한 회귀분석도 가능합니다.

위의 데이터는 다음과 같은 공식에 대해서, y값에 표준편차 30인 정규분포의 잡음(Noise)를 추가해 생성한 것입니다.

    $$y = 0.5 \times x^{3} - 0.5 \times x^{2} - 90 \times sin(x^{2}) + 1 $$

이제 위의 데이터셋을 이용해 딥러닝 학습을 통해 비선형 모델에 대한 추론에 대한 코드를 정리하겠습니다. 코드는 파이선으로, 그리고 딥러닝 라이브러리는 파이토치를 사용했습니다.

먼저 필요한 패키지를 임포트합니다.

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.init as init
import matplotlib.pyplot as plt

학습을 위해 데이터가 필요한데, 앞서 언급한 공식을 활용하여 총 5000개의 (x, y) 값의 데이터를 생성합니다. 물론 y 값에는 마찬가지로 앞서 언급한 표준편차가 30인 정규분포로 생성된 잡음을 반영합니다. 아래는 이에 대한 코드입니다.

num_data = 5000
noise = init.normal_(torch.FloatTensor(num_data,1), std=30)
x = init.uniform_(torch.Tensor(num_data,1),-10,10)

def func(x): return 0.5*(x**3) - 0.5*(x**2) - torch.sin(2*x)*90 + 1 
y_noise = func(x) + noise

신경망 모델을 생성합니다. 신경망 모델에 대한 코드는 아래와 같습니다.

model = nn.Sequential(
    nn.Linear(1,5),
    nn.LeakyReLU(0.2),
    nn.Linear(5,10),
    nn.LeakyReLU(0.2),
    nn.Linear(10,10),
    nn.LeakyReLU(0.2),    
    nn.Linear(10,10),
    nn.LeakyReLU(0.2),        
    nn.Linear(10,5),
    nn.LeakyReLU(0.2),          
    nn.Linear(5,1),
)

위의 신경망을 도식화하면 다음과 같습니다.

활성화 함수로 Leaky ReLU를 사용한 이유는, Sigmoid의 경우 기울기 소실이 발생하여 학습이 잘이루어지지 않고 일반 ReLU를 사용할 경우 학습 대상이 되는 가중치와 편향이 음수가 될 경우에 입력값까지 음수가 되면 최종 활성화 값이 항상 0이 되어 이 값이 뉴런에 전달되고, 전달 받은 뉴런이 제 역활을 하지 못하는 현상(문헌에서는 Dying Neuron이라고 함)이 발생하기 때문입니다 Leaky ReLU는 기울기 소실 문제와 입력값이 음수일때에도 일반 ReLU처럼 0이 아닌 가중치(위에서는 0.2)가 반영된 값이 활성값으로 결정되어 Dying Neuron 현상을 막아줍니다.

다음은 학습에 대한 코드입니다.

gpu = torch.device('cuda')
loss_func = nn.L1Loss().to(gpu)
optimizer = torch.optim.Adam(model.parameters(), lr=0.002)

model = model.to(gpu)
x = x.to(gpu)
y_noise = y_noise.to(gpu)

num_epoch = 20000
loss_array = []
for epoch in range(num_epoch):
    optimizer.zero_grad()
    output = model(x)
    
    loss = loss_func(output,y_noise)
    loss.backward()
    optimizer.step()
    
    loss_array.append(loss)

    if epoch % 100 == 0:
        print('epoch:', epoch, ' loss:', loss.item())

손실값은 매우 단순한 L1 손실을 사용는데, 위의 학습을 위한 데이터셋의 경우 오차값의 절대값이 L1 값이고 오차값에 대해 손실값이 비례하므로 L1 손실은 적당하고 학습 속도가 빠릅니다. 그리고 가중치에 대한 최적화 방법은 Adam을 사용했습니다. 일반 SGD 방식은 그 방식이 매우 단순해서 좀처럼 학습이 되지 않습니다.

이제 학습 동안 손실값의 추이와 추론된 신경망의 모델에 대한 결과를 그래프로 나타내기 위한 코드는 다음과 같습니다.

plt.plot(loss_array)
plt.show()

plt.figure(figsize=(10,10))

x = x.cpu().detach().numpy()
y_noise = y_noise.cpu().detach().numpy()
output = output.cpu().detach().numpy()

plt.scatter(x, y_noise, s=1, c="gray")
plt.scatter(x, output, s=1, c="red")

plt.show()

위 코드에서 손실에 대한 그래프 결과는 다음과 같습니다.

손실값이 매 에폭마다 감소하는 것을 보면 학습이 제대로 이루어지고 있다는 것을 알 수 있습니다. 그리고 가장 중요한 그래프인, 신경망 학습의 추론 결과에 대한 그래프입니다.

회색 지표는 학습 데이터이고 빨간색 지표가 학습된 모델이 추론한 결과입니다. 데이테에 매우 근접한 추론 결과를 나타내고 있는 것을 볼 수 있습니다. 그래프가 곡선처럼 보이지만 사실은 직선으로 구성된 그래프입니다. 이는 앞서 언급했듯이 신경망이 선형회귀에 기반하고 있기 때문입니다.

Python의 람다(Lambda)

파이썬의 람다 기능은 익명 함수(Anonymous), 즉 이름이 없는 함수를 정의하기 위한 용도로 사용됩니다. 참고로 컴퓨터 분야에서 정의는 없던 것에 대한 구체적인 생성을 의미하며, 선언은 일단 이름만 붙여두고 구체적인 생성은 다른 곳에서 대신하는 것을 의미합니다.

일반적으로 파이썬에서 함수는 다음처럼 정의됩니다.

def func(a):
    return a+1

위와 동일한 람다 방식의 함수 정의는 다음과 같죠. 위의 동일성을 유지하기 위해 람다 함수를 func에 할당하여 이름을 붙인 경우입니다. 호출은 일반함수과 동일합니다.

func = lambda a: a+1

다른 예로, 인자를 두개 받아 받은 인자값을 합해 반환하는 람다 함수는 다음과 같습니다.

func = lambda a,b: a+b

이 람다 함수의 사용은 생각해 보면, map처럼 함수를 인자로 받는 함수에서와 같습니다.

r = list(map(lambda a,b: a+b, [1,2,3], [10,20,30]))
print(r) # [11, 22, 33]

덧붙여 람다 함수를 이용하여 함수를 반환하는 함수를 정의할 수 있습니다. 클로저(Closure)라고도 하죠.

def makeFunc(n):
    return lambda a : a % n == 1

isOdd = makeFunc(2)

print(isOdd(11))

물론 클로저 함수를 정의하기 위해 람다를 사용할 필요는 없습니다. 아래처럼요.

def makeFunc(n):
    def func(a):
        return a % n == 1
    return func

단, 위의 코드는 함수의 이름(func)을 불필요하게 부여했다는 점이 거슬립니다.

람다를 통한 함수의 정의는 제약이 많습니다. 람다 함수는 반드시 반환값에 대한 단 한줄의 코드로만 구성되어야 한다는 점입니다. 그렇다면 람다함수에서 어떤 논리적인 조건처리는 어떻게 구현할 수 있을까요? 거기에 대한 힌트는 아래의 코드를 통해 살펴볼 수 있습니다.

a = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
r = list(
        map(
            lambda x: 
                'BIG' if x > 5 
                else 'SMALL' if x < 5 
                else 'MIDDLE',
            a
        )
    )
print(r) # ['SMALL', 'SMALL', 'SMALL', 'SMALL', 'MIDDLE', 'BIG', 'BIG', 'BIG', 'BIG', 'BIG']

이해를 돕고자 들여쓰기를 했는데, 리스트 요소 중 5보다 작으면 SMALL, 5보다 크면 BIG, 딱 5이면 MIDDLE 문자열로 구성된 또 다른 리스트를 반환하는 것입니다.

실제 코딩에서는 들여쓰기가 제거된 아래의 형태가 되겠네요.

a = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
r = list(map(lambda x: 'BIG' if x > 5 else 'SMALL' if x < 5 else 'MIDDLE', a))
print(r)

특성값 2개를 입력하여 3가지로 분류하는 신경망

이 글에서 소개하고 있는 소스코드는 사이토 고키가 저술한 “Deep Learning from Scratch”의 첫장에서 소개하고 있는 신경망입니다. 한국어판으로는 한빛미디어의 “밑바닥부터 시작하는 딥러닝2″로도 보실 수 있습니다. 해당 도서의 원저자의 허락 하에 글을 올립니다. 특히 공간상의 좌표와 속성값들에 대한 분류에 대한 내용인지라 GIS 분야에서 흔하게 활용될 수 있는 내용이라고 생각됩니다. 보다 자세한 내용은 해당도서를 추천드립니다.

공간 상의 (x,y)에 대한 하나의 지점에 대해서.. 가재, 물방개, 올챙이인 3가지 종류의 식생이 분포하고 있는 생태계가 있다고 합시다. 즉, 입력되는 특성값은 2개일때 3개 중 한개를 결정해야 합니다. 다시 말해, 이미 채집한 데이터가 있고, 이 데이터를 이용해 예측 모델을 훈련시킬 것이며, 훈련된 모델을 이용하면 임이의 (x,y) 위치에 대해 존재하는 식생이 무엇인지를 예측하고자 합니다. 그럼 먼저 예측 모델을 훈련할때 사용할 채집 데이터가 필요한데, 아래의 load_spiral_data 함수가 이러한 채집 데이터를 생성해 줍니다. 아래의 코드는 load_spiral_data 함수를 이용해 채집 데이터를 생성하고 시각화합니다.

import numpy as np
import matplotlib.pyplot as plt

def load_spiral_data(seed=7777):
    np.random.seed(seed)
    DIM = 2  # 입력 데이터 특성 수
    CLS_NUM = 3  # 분류할 클래스 수
    N = 100  # 클래스 하나 당 샘플 데이터 수

    x = np.zeros((N*CLS_NUM, DIM))
    t = np.zeros((N*CLS_NUM, CLS_NUM), dtype=np.int)

    for j in range(CLS_NUM):
        for i in range(N): # N*j, N*(j+1)):
            rate = i / N
            radius = 1.0*rate
            theta = j*4.0 + 4.0*rate + np.random.randn()*0.2

            ix = N*j + i
            x[ix] = np.array([radius*np.sin(theta),
                              radius*np.cos(theta)]).flatten()
            t[ix, j] = 1

    return x, t

x, t = load_spiral_data()
N = 100
CLS_NUM = 3
markers = ['o', 'x', '^']
for i in range(CLS_NUM):
    plt.scatter(x[i*N:(i+1)*N, 0], x[i*N:(i+1)*N, 1], s=40, marker=markers[i])
plt.show()

결과는 아래와 같습니다.

훈련용 데이터가 준비되었으므로, 이제 예측 모델에 대한 신경망에 대한 코드를 살펴보겠습니다. 아래는 은닉층이 1개인 신경망에 대한 클래스입니다. 은닉층이 1개이구요. 이 클래스의 생성자에 대한 인자는 입력되는 특성값의 개수(input_size)와 은닉층의 뉴런 개수(hidden_size) 그리고 분류 결과 개수(output_size)입니다.

class Net:
    def __init__(self, input_size, hidden_size, output_size):
        I, H, O = input_size, hidden_size, output_size

        W1 = 0.01 * np.random.randn(I, H)
        b1 = np.zeros(H)
        W2 = 0.01 * np.random.randn(H, O)
        b2 = np.zeros(O)

        self.layers = [
            Affine(W1, b1),
            Sigmoid(),
            Affine(W2, b2)
        ]
        self.loss_layer = SoftmaxWithLoss()

        self.params, self.grads = [], []
        for layer in self.layers:
            self.params += layer.params
            self.grads += layer.grads

    def predict(self, x):
        for layer in self.layers:
            x = layer.forward(x)
        return x

    def forward(self, x, t):
        score = self.predict(x)
        loss = self.loss_layer.forward(score, t)
        return loss

    def backward(self, dout=1):
        dout = self.loss_layer.backward(dout)
        for layer in reversed(self.layers):
            dout = layer.backward(dout)
        return dout

신경망의 뉴런값과 가중치 그리고 편향에 대한 연산을 위해 Affine 클래스가 사용됬으며, 신경망의 기반이 되는 선형 회귀에 비선형 회귀 특성을 부여하는 활성화함수로 Sigmoid 클래스가 사용되었습니다. 아울러 3개 이상의 출력결과를 확률로 해석하고 이를 기반으로 손실값을 계산할 수 있는 Softmax와 Cross Entropy Error를 하나의 합친 SoftmaxWithLoss 클래스가 사용되었습니다.

먼저 Affine, Sigmoid 클래스의 코드는 다음과 같습니다.

class Affine:
    def __init__(self, W, b):
        self.params = [W, b]
        self.grads = [np.zeros_like(W), np.zeros_like(b)]
        self.x = None

    def forward(self, x):
        W, b = self.params
        out = np.dot(x, W) + b
        self.x = x
        return out

    def backward(self, dout):
        W, b = self.params
        dx = np.dot(dout, W.T)
        dW = np.dot(self.x.T, dout)
        db = np.sum(dout, axis=0)

        self.grads[0][...] = dW
        self.grads[1][...] = db
        return dx

class Sigmoid:
    def __init__(self):
        self.params, self.grads = [], []
        self.out = None

    def forward(self, x):
        out = 1 / (1 + np.exp(-x))
        self.out = out
        return out

    def backward(self, dout):
        dx = dout * (1.0 - self.out) * self.out
        return dx

그리고 SoftmaxWithLoss 클래스는 다음과 같습니다.

class SoftmaxWithLoss:
    def __init__(self):
        self.params, self.grads = [], []
        self.y = None  # Softmax의 출력
        self.t = None  # 정답 레이블

    def forward(self, x, t):
        self.t = t
        self.y = softmax(x)

        # 정답 레이블이 원핫 벡터일 경우 정답의 인덱스로 변환
        if self.t.size == self.y.size:
            self.t = self.t.argmax(axis=1)

        loss = cross_entropy_error(self.y, self.t)
        return loss

    def backward(self, dout=1):
        batch_size = self.t.shape[0]

        dx = self.y.copy()
        dx[np.arange(batch_size), self.t] -= 1
        dx *= dout
        dx = dx / batch_size

        return dx

우의 클래스는 Softmax 연산과 Cross Entropy Error 연산을 각각 softmax와 cross_entropy_error 함수를 통해 처리하고 있습니다. 이 두 함수는 아래와 같습니다.

def softmax(x):
    if x.ndim == 2:
        x = x - x.max(axis=1, keepdims=True)
        x = np.exp(x)
        x /= x.sum(axis=1, keepdims=True)
    elif x.ndim == 1:
        x = x - np.max(x)
        x = np.exp(x) / np.sum(np.exp(x))

    return x

def cross_entropy_error(y, t):
    if y.ndim == 1:
        t = t.reshape(1, t.size)
        y = y.reshape(1, y.size)
        
    # 정답 데이터가 원핫 벡터일 경우 정답 레이블 인덱스로 변환
    if t.size == y.size:
        t = t.argmax(axis=1)
             
    batch_size = y.shape[0]

    return -np.sum(np.log(y[np.arange(batch_size), t] + 1e-7)) / batch_size

학습 데이터와 모델까지 준비되었으므로, 이제 실제 학습에 대한 코드를 살펴보겠습니다. 먼저 학습을 위한 하이퍼파라메터 및 모델생성 그리고 기타 변수값들입니다.

# 하이퍼파라미터
max_epoch = 300
batch_size = 30
hidden_size = 10
learning_rate = 1.0

# 모델
model = Net(input_size=2, hidden_size=hidden_size, output_size=3)

# 가중치 최적화를 위한 옵티마이저
optimizer = SGD(lr=learning_rate)

# 학습에 사용하는 변수
data_size = len(x)
max_iters = data_size // batch_size
total_loss = 0
loss_count = 0
loss_list = []

다음은 실제 학습 수행 코드입니다.

for epoch in range(max_epoch):
    # 데이터 뒤섞기
    idx = np.random.permutation(data_size)
    x = x[idx]
    t = t[idx]

    for iters in range(max_iters):
        batch_x = x[iters*batch_size:(iters+1)*batch_size]
        batch_t = t[iters*batch_size:(iters+1)*batch_size]

        # 기울기를 구해 매개변수 갱신
        loss = model.forward(batch_x, batch_t)
        model.backward()
        optimizer.update(model.params, model.grads)

        total_loss += loss
        loss_count += 1

        # 정기적으로 학습 경과 출력
        if (iters+1) % 10 == 0:
            avg_loss = total_loss / loss_count
            print('| 에폭 %d |  반복 %d / %d | 손실 %.2f'
                  % (epoch + 1, iters + 1, max_iters, avg_loss))
            loss_list.append(avg_loss)
            total_loss, loss_count = 0, 0

출력값으로써 매 학습 단계마다 손실값이 줄어드는 것을 확인할 수 있습니다.

아래는 위의 학습이 완료되면 그 결과를 시각화해주는 코드입니다.

plt.plot(np.arange(len(loss_list)), loss_list, label='train')
plt.xlabel('Loop (x10)')
plt.ylabel('Loss')
plt.show()

결과는 다음과 같습니다.

이제 이 학습된 모델을 이용해 새로운 입력 데이터를 분류하고 이에 대한 결과를 효과적으로 시각해 주는 코드는 아래와 같습니다.

# 경계 영역 플롯
h = 0.001
x_min, x_max = x[:, 0].min() - .1, x[:, 0].max() + .1
y_min, y_max = x[:, 1].min() - .1, x[:, 1].max() + .1
xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))
X = np.c_[xx.ravel(), yy.ravel()]
score = model.predict(X)
predict_cls = np.argmax(score, axis=1)
Z = predict_cls.reshape(xx.shape)
plt.contourf(xx, yy, Z)
plt.axis('off')

# 데이터점 플롯
x, t = load_spiral_data()
N = 100
CLS_NUM = 3
markers = ['o', 'x', '^']
for i in range(CLS_NUM):
    plt.scatter(x[i*N:(i+1)*N, 0], x[i*N:(i+1)*N, 1], s=40, marker=markers[i])
plt.show()

결과는 아래와 같습니다.