読者です 読者をやめる 読者になる 読者になる

KZKY memo

自分用メモ.

Theano: Scan Op

python theano

現時点でTheano(0.7.0).いまさらながら,scan opをちゃんと見てみることにした.scan opのいいところは,variable length input/outputに対応できること,および,loopの中に,symbolに対してconditionを付けられること.

基本的に,このページを順に見ていく.

サンプルコードは,ここにある.

Simple loop with accumulation: Computing A^k

{ \displaystyle
A^k
}

をコードで書きたいときには,普通,次の様に書く

non-scan power code
result = 1
for i in range(k):
    result = result * A

これを見ると,

  • resultの初期値化
  • 変わらない変数 A
  • 関数 (この場合はaccumulation)

がある.これをscanの引数に対応させると,

  • resultの初期値化: output_info
  • 変わらない変数 A: non_sequences
  • 関数 (この場合はaccumulation): lambda or named function

となる.これは最も簡単なケース.

上記コードを,Scanに置き換える場合
import theano
import theano.tensor as T

k = T.iscalar("k")
A = T.vector("A")

# Symbolic description of the result
result, updates = theano.scan(fn=lambda prior_result, A: prior_result * A,
                              outputs_info=T.ones_like(A),
                              non_sequences=A,
                              n_steps=k)

# Optimization saving memory.
final_result = result[-1]

# Compiled function that returns A**k
power = theano.function(inputs=[A, k], outputs=final_result, updates=updates)

print power(range(10), 2)
print power(range(10), 4)

result自体は,list of Tesnorで返ってくる.初めの次元がtime step.この例では,computational graphを作る前に,result[-1]のみをtheano.functionの引数にすることで,メモリ最適化をしている.また,n_stepsを指定しており,variable lengthではないので注意.

Iterating over the first dimension of a tensor: Calculating a polynomial

通常のfor x in a_listのloopのように(a_listがlist of list of ...を前提とする),初めの次元でイテレーションができる.その時には,scan引数のsequecencesを使用する.

多項式のサンプル
import numpy
import theano
import theano.tensor as T

coefficients = theano.tensor.vector("coefficients")
x = T.scalar("x")

max_coefficients_supported = 10000

# Generate the components of the polynomial
components, updates = theano.scan(fn=lambda coefficient, power, free_variable: coefficient * (free_variable ** power),
                                  outputs_info=None,
                                  sequences=[coefficients, theano.tensor.arange(max_coefficients_supported)],
                                  non_sequences=x)
# Sum them up
polynomial = components.sum()

# Compile a function
calculate_polynomial = theano.function(inputs=[coefficients, x], outputs=polynomial)

# Test
test_coefficients = numpy.asarray([1, 0, 2], dtype=numpy.float32)
test_value = 3
print calculate_polynomial(test_coefficients, test_value)
print 1.0 * (3 ** 0) + 0.0 * (3 ** 1) + 2.0 * (3 ** 2)


このサンプルで確認したいのは次の4点.

  • サンプルでは多項式の項を全部返して最後に,components.sum()しているが,scanの中でcumsumしてから最後の項を取るほうがメモリ最適になる
  • output_info=Noneにすると,fnの引数にprevious resultを入れないで良い
  • python's enumerateのシミュレートとして,theano.thensor.arange(n)を使う.itereated indicesを使いたい場合に使うんだろう
  • sequencesに同じ長さでない変数を入れると,長さが小さい方に合わさる様に打ち切られる

重要なのはscanの引数がどうfnの引数に渡るのか

sequences (if any), prior result(s) (if needed), non-sequences (if any)

のように渡る.入力,出力,それ以外と覚えればいいか.

Simple accumulation into a scalar, ditching lambda

Named Function Sample
import numpy as np
import theano
import theano.tensor as T

up_to = T.iscalar("up_to")

# define a named function, rather than using lambda
def accumulate_by_adding(arange_val, sum_to_date):
    return sum_to_date + arange_val
seq = T.arange(up_to)

# An unauthorized implicit downcast from the dtype of 'seq', to that of
# 'T.as_tensor_variable(0)' which is of dtype 'int8' by default would occur
# if this instruction were to be used instead of the next one:
# outputs_info = T.as_tensor_variable(0)

outputs_info = T.as_tensor_variable(np.asarray(0, seq.dtype))
scan_result, scan_updates = theano.scan(fn=accumulate_by_adding,
                                        outputs_info=outputs_info,
                                        sequences=seq)
triangular_sequence = theano.function(inputs=[up_to], outputs=scan_result)

# test
some_num = 15
print triangular_sequence(some_num)
print [n * (n + 1) // 2 for n in xrange(some_num)]

注意するのは,

  • output_infoは関数の出力と同じshapeであること
  • output_infoで,暗示できなdowncastがあるとエラー

Another simple example

このサンプルはscanを使わないと再現が難しい.(らしい)

特に難しいことはやっていないが,symbolでmatrix, location, valuesを用意する.
scanのinner funcの中で,特定のlocationにvalueを入れたmatrixを各time stepで返却する.

another simple example
import numpy as np
import theano
import theano.tensor as T

location = T.imatrix("location")
values = T.vector("values")
output_model = T.matrix("output_model")

def set_value_at_position(a_location, a_value, output_model):
    zeros = T.zeros_like(output_model)
    zeros_subtensor = zeros[a_location[0], a_location[1]]
    return T.set_subtensor(zeros_subtensor, a_value)

result, updates = theano.scan(fn=set_value_at_position,
                              outputs_info=None,
                              sequences=[location, values],
                              non_sequences=output_model)

assign_values_at_positions = theano.function(inputs=[location, values, output_model], outputs=result)

# test
test_locations = np.asarray([[1, 1], [2, 3]], dtype=np.int32)
test_values = np.asarray([42, 50], dtype=np.float32)
test_output_model = np.zeros((5, 5), dtype=np.float32)
print assign_values_at_positions(test_locations, test_values, test_output_model)

Using shared variables - Gibbs sampling

この辺から,shared variableとupdateの関係の説明がはいってくる.
観測されている変数と隠れ変数でパラメータをシェアしている場合に,それらのレイヤー間で,サンプルとパラメータで線形変換,その結果を2項分布の平均に使ってサンプリング,そのサンプルとパラメータで線形変換,その結果を2項分布の平均に使ってまたサンプリング,している(多分 RBM系の例だと思う).

Gibbs Sampling Sample
import theano
import numpy as np
from theano import tensor as T

# Paramter initial values
W_values = np.random.rand(10, 5).astype(np.float32)
bvis_values = np.random.rand(10).astype(np.float32)
bhid_values = np.random.rand(10).astype(np.float32)

# Paramter symbols
W = theano.shared(W_values)
bvis = theano.shared(bvis_values)
bhid = theano.shared(bhid_values)

trng = T.shared_randomstreams.RandomStreams(1234)

def OneStep(vsample):
    hmean = T.nnet.sigmoid(theano.dot(vsample, W) + bhid)
    hsample = trng.binomial(size=hmean.shape, n=1, p=hmean)
    vmean = T.nnet.sigmoid(theano.dot(hsample, W.T) + bvis)
    return trng.binomial(size=vsample.shape, n=1, p=vmean,
                         dtype=theano.config.floatX)

sample = theano.tensor.vector()

values, updates = theano.scan(OneStep, outputs_info=sample, n_steps=10)

gibbs10 = theano.function([sample], values[-1], updates=updates)

print gibbs10([np.random.rand(5)])
  • updates dictionaryは後でも使える
  • updatesをfunctionに渡さないと,scanの中でupdateされる変数はupdateされない

上記例では,shared varialbesをscanに渡していないが,scanは,ちゃんと判断してくれる.最適化を考えると渡したほうがいい.その場合は,non_sequencesに渡す.

Using shared variables - the strict flag

最適化を考えると,scanの引数にshared variablesを渡したほうがいい.そこで,それを強制する引数がある.scan(..., strict=True, ...)とすると,fnで必要な,shared varialbeがちゃんと,non_sequencesに入っているかチェックしてくれる.入っていないとエラーを出す.

Multiple outputs, several taps values - Recurrent Neural Network with Scan

複雑な例.
今までは,scanの引数のsequences, output_info, non_sequencesにlist of nondictを入れていたが,dictとかlist of dictを入れる.sequencesとoutput_infoは,list of symbol or dictionaryが引数として許容されている様.

{ \displaystyle
x(n) = tanh(W x(n - 1) + W^{in}_1 u(n) + W^{in}_2 u(n - 4) + W^{feedback} y(n - 1)) \\
y(n) = W^{out} x(n - 3)
}

上記式のRNN(全く意味を持たない単なるサンプル)
import theano
import numpy as np
from theano import tensor as T

def oneStep(u_tm4, u_t, x_tm3, x_tm1, y_tm1, W, W_in_1, W_in_2, W_feedback, W_out):

    x_t = T.tanh(theano.dot(x_tm1, W) +
                 theano.dot(u_t, W_in_1) +
                 theano.dot(u_tm4, W_in_2) +
                 theano.dot(y_tm1, W_feedback))
    y_t = theano.dot(x_tm3, W_out)

    return [x_t, y_t]

W_ = np.random.rand(10, 10).astype(np.float32)
W_in_1_ = np.random.rand(10, 10).astype(np.float32)
W_in_2_ = np.random.rand(10, 10).astype(np.float32)
W_feedback_ = np.random.rand(10, 10).astype(np.float32)
W_out_ = np.random.rand(10, 10).astype(np.float32)
    
W = theano.shared(W_)
W_in_1 = theano.shared(W_in_1_)
W_in_2 = theano.shared(W_in_2_)
W_feedback = theano.shared(W_feedback_)
W_out = theano.shared(W_out_)

u = T.matrix()    # it is a sequence of vectors
x0 = T.matrix()  # initial state of x has to be a matrix, since it has to cover x[-3]
y0 = T.vector()  # y0 is just a vector since scan has only to provide y[-1]

([x_vals, y_vals], updates) = theano.scan(fn=oneStep,
                                          sequences=dict(input=u, taps=[-4, -0]),
                                          outputs_info=[dict(initial=x0, taps=[-3, -1]), y0],
                                          non_sequences=[W, W_in_1, W_in_2, W_feedback, W_out],
                                          strict=True)
# for second input y, scan adds -1 in output_taps by default

func = theano.function(inputs=[u, x0, y0], outputs=[x_vals, y_vals], updates=updates)

u_ = np.random.rand(50, 10).astype(np.float32)
x0_ = np.random.rand(5, 10).astype(np.float32)
y0_ = np.random.rand(10).astype(np.float32)

print func(u_, x0_, y0_)

注意するのは,x(n -3)まで見ているので,xは,matrix(0-dが4次元以上)でなくてはならない.過去の出力(x_t)を,x0にとってくれているのだろうか?

sequences=dict(input=u, taps[-4, 0])となっているので,uvals = [0,1,2,3,4,5,6,7,8]とした時に,u[-4]がuvals[0]となって,u[0]がuvals[4]から始まるのにも注意.

Conditional ending of Scan

symbolに対してconditionを付けられる. repeat-untilの例.

  • conditionで使える引数は,fnの引数
  • conditionはfnの戻り値の最後につける
import theano
from theano import tensor as T

def power_of_2(previous_power, max_value):
    return previous_power*2, theano.scan_module.until(previous_power*2 > max_value)

max_value = T.scalar()
values, _ = theano.scan(power_of_2,
                        outputs_info=T.constant(1.),
                        non_sequences=max_value,
                        n_steps=1024)

f = theano.function([max_value], values)

print(f(45))

Optimizing Scan’s performance

  • あまりscanは使わないようにする
  • inputは明示的にfnに入れる
  • scanでgcしない,"config.scan.allow_gc" or "allow_gc"を使う
  • LSTMとかそうだけれど,細かいmatrix mulitplicationsをするより,大きい行列にconcatしてから一回matrix mulitplicationsしたほうがメモリは食うけど早い.