Theano: Scan Op
現時点でTheano(0.7.0).いまさらながら,scan opをちゃんと見てみることにした.scan opのいいところは,variable length input/outputに対応できること,および,loopの中に,symbolに対してconditionを付けられること.
基本的に,このページを順に見ていく.
サンプルコードは,ここにある.
Simple loop with accumulation: Computing A^k
をコードで書きたいときには,普通,次の様に書く
non-scan power code
result = 1 for i in range(k): result = result * A
これを見ると,
- resultの初期値化
- 変わらない変数 A
- 関数 (この場合はaccumulation)
がある.これをscanの引数に対応させると,
- resultの初期値化: output_info
- 変わらない変数 A: non_sequences
- 関数 (この場合はaccumulation): lambda or named function
となる.これは最も簡単なケース.
上記コードを,Scanに置き換える場合
import theano import theano.tensor as T k = T.iscalar("k") A = T.vector("A") # Symbolic description of the result result, updates = theano.scan(fn=lambda prior_result, A: prior_result * A, outputs_info=T.ones_like(A), non_sequences=A, n_steps=k) # Optimization saving memory. final_result = result[-1] # Compiled function that returns A**k power = theano.function(inputs=[A, k], outputs=final_result, updates=updates) print power(range(10), 2) print power(range(10), 4)
result自体は,list of Tesnorで返ってくる.初めの次元がtime step.この例では,computational graphを作る前に,result[-1]のみをtheano.functionの引数にすることで,メモリ最適化をしている.また,n_stepsを指定しており,variable lengthではないので注意.
Iterating over the first dimension of a tensor: Calculating a polynomial
通常のfor x in a_listのloopのように(a_listがlist of list of ...を前提とする),初めの次元でイテレーションができる.その時には,scan引数のsequecencesを使用する.
多項式のサンプル
import numpy import theano import theano.tensor as T coefficients = theano.tensor.vector("coefficients") x = T.scalar("x") max_coefficients_supported = 10000 # Generate the components of the polynomial components, updates = theano.scan(fn=lambda coefficient, power, free_variable: coefficient * (free_variable ** power), outputs_info=None, sequences=[coefficients, theano.tensor.arange(max_coefficients_supported)], non_sequences=x) # Sum them up polynomial = components.sum() # Compile a function calculate_polynomial = theano.function(inputs=[coefficients, x], outputs=polynomial) # Test test_coefficients = numpy.asarray([1, 0, 2], dtype=numpy.float32) test_value = 3 print calculate_polynomial(test_coefficients, test_value) print 1.0 * (3 ** 0) + 0.0 * (3 ** 1) + 2.0 * (3 ** 2)
このサンプルで確認したいのは次の4点.
- サンプルでは多項式の項を全部返して最後に,components.sum()しているが,scanの中でcumsumしてから最後の項を取るほうがメモリ最適になる
- output_info=Noneにすると,fnの引数にprevious resultを入れないで良い
- python's enumerateのシミュレートとして,theano.thensor.arange(n)を使う.itereated indicesを使いたい場合に使うんだろう
- sequencesに同じ長さでない変数を入れると,長さが小さい方に合わさる様に打ち切られる
重要なのはscanの引数がどうfnの引数に渡るのか
sequences (if any), prior result(s) (if needed), non-sequences (if any)
のように渡る.入力,出力,それ以外と覚えればいいか.
Simple accumulation into a scalar, ditching lambda
Named Function Sample
import numpy as np import theano import theano.tensor as T up_to = T.iscalar("up_to") # define a named function, rather than using lambda def accumulate_by_adding(arange_val, sum_to_date): return sum_to_date + arange_val seq = T.arange(up_to) # An unauthorized implicit downcast from the dtype of 'seq', to that of # 'T.as_tensor_variable(0)' which is of dtype 'int8' by default would occur # if this instruction were to be used instead of the next one: # outputs_info = T.as_tensor_variable(0) outputs_info = T.as_tensor_variable(np.asarray(0, seq.dtype)) scan_result, scan_updates = theano.scan(fn=accumulate_by_adding, outputs_info=outputs_info, sequences=seq) triangular_sequence = theano.function(inputs=[up_to], outputs=scan_result) # test some_num = 15 print triangular_sequence(some_num) print [n * (n + 1) // 2 for n in xrange(some_num)]
注意するのは,
- output_infoは関数の出力と同じshapeであること
- output_infoで,暗示できなdowncastがあるとエラー
Another simple example
このサンプルはscanを使わないと再現が難しい.(らしい)
特に難しいことはやっていないが,symbolでmatrix, location, valuesを用意する.
scanのinner funcの中で,特定のlocationにvalueを入れたmatrixを各time stepで返却する.
another simple example
import numpy as np import theano import theano.tensor as T location = T.imatrix("location") values = T.vector("values") output_model = T.matrix("output_model") def set_value_at_position(a_location, a_value, output_model): zeros = T.zeros_like(output_model) zeros_subtensor = zeros[a_location[0], a_location[1]] return T.set_subtensor(zeros_subtensor, a_value) result, updates = theano.scan(fn=set_value_at_position, outputs_info=None, sequences=[location, values], non_sequences=output_model) assign_values_at_positions = theano.function(inputs=[location, values, output_model], outputs=result) # test test_locations = np.asarray([[1, 1], [2, 3]], dtype=np.int32) test_values = np.asarray([42, 50], dtype=np.float32) test_output_model = np.zeros((5, 5), dtype=np.float32) print assign_values_at_positions(test_locations, test_values, test_output_model)
Using shared variables - Gibbs sampling
この辺から,shared variableとupdateの関係の説明がはいってくる.
観測されている変数と隠れ変数でパラメータをシェアしている場合に,それらのレイヤー間で,サンプルとパラメータで線形変換,その結果を2項分布の平均に使ってサンプリング,そのサンプルとパラメータで線形変換,その結果を2項分布の平均に使ってまたサンプリング,している(多分 RBM系の例だと思う).
Gibbs Sampling Sample
import theano import numpy as np from theano import tensor as T # Paramter initial values W_values = np.random.rand(10, 5).astype(np.float32) bvis_values = np.random.rand(10).astype(np.float32) bhid_values = np.random.rand(10).astype(np.float32) # Paramter symbols W = theano.shared(W_values) bvis = theano.shared(bvis_values) bhid = theano.shared(bhid_values) trng = T.shared_randomstreams.RandomStreams(1234) def OneStep(vsample): hmean = T.nnet.sigmoid(theano.dot(vsample, W) + bhid) hsample = trng.binomial(size=hmean.shape, n=1, p=hmean) vmean = T.nnet.sigmoid(theano.dot(hsample, W.T) + bvis) return trng.binomial(size=vsample.shape, n=1, p=vmean, dtype=theano.config.floatX) sample = theano.tensor.vector() values, updates = theano.scan(OneStep, outputs_info=sample, n_steps=10) gibbs10 = theano.function([sample], values[-1], updates=updates) print gibbs10([np.random.rand(5)])
- updates dictionaryは後でも使える
- updatesをfunctionに渡さないと,scanの中でupdateされる変数はupdateされない
上記例では,shared varialbesをscanに渡していないが,scanは,ちゃんと判断してくれる.最適化を考えると渡したほうがいい.その場合は,non_sequencesに渡す.
Using shared variables - the strict flag
最適化を考えると,scanの引数にshared variablesを渡したほうがいい.そこで,それを強制する引数がある.scan(..., strict=True, ...)とすると,fnで必要な,shared varialbeがちゃんと,non_sequencesに入っているかチェックしてくれる.入っていないとエラーを出す.
Multiple outputs, several taps values - Recurrent Neural Network with Scan
複雑な例.
今までは,scanの引数のsequences, output_info, non_sequencesにlist of nondictを入れていたが,dictとかlist of dictを入れる.sequencesとoutput_infoは,list of symbol or dictionaryが引数として許容されている様.
上記式のRNN(全く意味を持たない単なるサンプル)
import theano import numpy as np from theano import tensor as T def oneStep(u_tm4, u_t, x_tm3, x_tm1, y_tm1, W, W_in_1, W_in_2, W_feedback, W_out): x_t = T.tanh(theano.dot(x_tm1, W) + theano.dot(u_t, W_in_1) + theano.dot(u_tm4, W_in_2) + theano.dot(y_tm1, W_feedback)) y_t = theano.dot(x_tm3, W_out) return [x_t, y_t] W_ = np.random.rand(10, 10).astype(np.float32) W_in_1_ = np.random.rand(10, 10).astype(np.float32) W_in_2_ = np.random.rand(10, 10).astype(np.float32) W_feedback_ = np.random.rand(10, 10).astype(np.float32) W_out_ = np.random.rand(10, 10).astype(np.float32) W = theano.shared(W_) W_in_1 = theano.shared(W_in_1_) W_in_2 = theano.shared(W_in_2_) W_feedback = theano.shared(W_feedback_) W_out = theano.shared(W_out_) u = T.matrix() # it is a sequence of vectors x0 = T.matrix() # initial state of x has to be a matrix, since it has to cover x[-3] y0 = T.vector() # y0 is just a vector since scan has only to provide y[-1] ([x_vals, y_vals], updates) = theano.scan(fn=oneStep, sequences=dict(input=u, taps=[-4, -0]), outputs_info=[dict(initial=x0, taps=[-3, -1]), y0], non_sequences=[W, W_in_1, W_in_2, W_feedback, W_out], strict=True) # for second input y, scan adds -1 in output_taps by default func = theano.function(inputs=[u, x0, y0], outputs=[x_vals, y_vals], updates=updates) u_ = np.random.rand(50, 10).astype(np.float32) x0_ = np.random.rand(5, 10).astype(np.float32) y0_ = np.random.rand(10).astype(np.float32) print func(u_, x0_, y0_)
注意するのは,x(n -3)まで見ているので,xは,matrix(0-dが4次元以上)でなくてはならない.過去の出力(x_t)を,x0にとってくれているのだろうか?
sequences=dict(input=u, taps[-4, 0])となっているので,uvals = [0,1,2,3,4,5,6,7,8]とした時に,u[-4]がuvals[0]となって,u[0]がuvals[4]から始まるのにも注意.
Conditional ending of Scan
symbolに対してconditionを付けられる. repeat-untilの例.
- conditionで使える引数は,fnの引数
- conditionはfnの戻り値の最後につける
import theano from theano import tensor as T def power_of_2(previous_power, max_value): return previous_power*2, theano.scan_module.until(previous_power*2 > max_value) max_value = T.scalar() values, _ = theano.scan(power_of_2, outputs_info=T.constant(1.), non_sequences=max_value, n_steps=1024) f = theano.function([max_value], values) print(f(45))