大家好,
今天想和大家讲讲sklearn,我在处理 30W 行数据做分类的时候发现 sklearn 对多核工作站支持效果不是太好,我使用一个 20 核 E5 工作站居然还没有我笔记本速度快,看了一下发现 sklearn 没有充分使用 64G 内存和 CPU 的多核。 这里编写了一个 sklearn 的 SVM 类,可以通过占据更多的计算资源对 SVM 加速进行。
训练 fit(trainX, trainY)
预测 pred(testX)
学习步长 learning_rate = 0.001
训练周期 training_epoch = None,
终止误差 error = 0.001,
if training_epoch is not None then the error will not effect
显示步长 display_step = 5
高斯核定义: gamma
K(x)=exp(γxxT)
Gaussian RBF
import tensorflow as tf import functools def lazy_property(function): attribute = '_' + function.__name__ @property @functools.wraps(function) def wrapper(self): if not hasattr(self, attribute): setattr(self, attribute, function(self)) return getattr(self, attribute) return wrapper class NonlinearSVC(object): def __init__(self, learning_rate = 0.001, training_epoch = None, error = 0.001, display_step = 5): self.learning_rate = learning_rate self.training_epoch = training_epoch self.display_step = display_step self.error = error def __Preprocessing(self, trainX): row = trainX.shape[0] col = trainX.shape[1] self.X = tf.placeholder(shape=[row, col], dtype= tf.float32) self.Y = tf.placeholder(shape=[row, 1], dtype= tf.float32) self.test = tf.placeholder(shape=[None, col], dtype= tf.float32) self.beta = tf.Variable(tf.truncated_normal(shape=[1, row], stddev=.1)) @lazy_property def Kernel_Train(self): tmp_abs = tf.reshape(tensor=tf.reduce_sum(tf.square(self.X), axis=1), shape=[-1,1]) tmp_ = tf.add(tf.sub(tmp_abs, tf.mul(2., tf.matmul(self.X, tf.transpose(self.X)))), tf.transpose(tmp_abs)) return tf.exp(tf.mul(self.gamma, tf.abs(tmp_))) @lazy_property def Kernel_Prediction(self): tmpA = tf.reshape(tf.reduce_sum(tf.square(self.X), 1),[-1,1]) tmpB = tf.reshape(tf.reduce_sum(tf.square(self.test), 1),[-1,1]) tmp = tf.add(tf.sub(tmpA, tf.mul(2.,tf.matmul(self.X, self.test, transpose_b=True))), tf.transpose(tmpB)) return tf.exp(tf.mul(self.gamma, tf.abs(tmp))) @lazy_property def Cost(self): left = tf.reduce_sum(self.beta) beta_square = tf.matmul(self.beta, self.beta, transpose_a=True) Y_square = tf.matmul(self.Y, self.Y, transpose_b= True) right = tf.reduce_sum(tf.mul(self.Kernel_Train, tf.mul(beta_square, Y_square))) return tf.neg(tf.sub(left, right)) @lazy_property def Prediction(self): kernel_out = tf.matmul(tf.mul(tf.transpose(self.Y),self.beta), self.Kernel_Prediction) return tf.sign(kernel_out - tf.reduce_mean(kernel_out)) @lazy_property def Accuracy(self): return tf.reduce_mean(tf.cast(tf.equal(tf.squeeze(self.Prediction), tf.squeeze(self.Y)), tf.float32)) def fit(self, trainX, trainY, gamma= 50.): self.sess = tf.InteractiveSession() self.__Preprocessing(trainX) self.gamma = tf.constant(value= -gamma, dtype=tf.float32) #self.optimizer = tf.train.ProximalGradientDescentOptimizer(self.learning_rate).minimize(self.Cost) self.optimizer = tf.train.AdamOptimizer(self.learning_rate).minimize(self.Cost) self.sess.run(tf.global_variables_initializer()) if self.training_epoch is not None: for ep in range(self.training_epoch): self.sess.run(self.optimizer, feed_dict={self.X:trainX, self.Y:trainY}) if ep % self.display_step== 0: loss, acc = self.sess.run([self.Cost, self.Accuracy], feed_dict={self.X:trainX, self.Y:trainY, self.test:trainX}) print ('epoch=',ep,'loss= ',loss, 'accuracy= ', acc) elif self.training_epoch is None: acc = 0.1 ep = 0 while (acc< 1.- self.error): acc,_ = self.sess.run([self.Accuracy, self.optimizer], feed_dict={self.X:trainX, self.Y:trainY, self.test:trainX}) ep += 1 if ep % self.display_step== 0: loss = self.sess.run(self.Cost, feed_dict={self.X:trainX, self.Y:trainY}) print ('epoch=',ep,'loss= ',loss, 'accuracy= ', acc) def pred(self, test): output = self.sess.run(self.Prediction, feed_dict={self.X:trainX, self.Y:trainY, self.test:trainX}) return output
Linear SVM
import tensorflow as tf import functools def lazy_property(function): attribute = '_' + function.__name__ @property @functools.wraps(function) def wrapper(self): if not hasattr(self, attribute): setattr(self, attribute, function(self)) return getattr(self, attribute) return wrapper class LinearSVC(object): def __init__(self, learning_rate = 0.001, training_epoch = None, error = 0.001, display_step = 5): self.learning_rate = learning_rate self.training_epoch = training_epoch self.display_step = display_step self.error = error def __Preprocessing(self, trainX): row = trainX.shape[0] col = trainX.shape[1] self.X = tf.placeholder(shape=[row, col], dtype= tf.float32) self.Y = tf.placeholder(shape=[row, 1], dtype= tf.float32) self.test = tf.placeholder(shape=[None, col], dtype= tf.float32) self.beta = tf.Variable(tf.truncated_normal(shape=[1, row], stddev=.1)) @lazy_property def Kernel_Train(self): return tf.matmul(self.X, self.X, transpose_b=True) @lazy_property def Kernel_Prediction(self): return tf.matmul(self.X, self.test, transpose_b=True) @lazy_property def Cost(self): left = tf.reduce_sum(self.beta) beta_square = tf.matmul(self.beta, self.beta, transpose_a=True) Y_square = tf.matmul(self.Y, self.Y, transpose_b= True) right = tf.reduce_sum(tf.mul(self.Kernel_Train, tf.mul(beta_square, Y_square))) return tf.neg(tf.sub(left, right)) @lazy_property def Prediction(self): kernel_out = tf.matmul(tf.mul(tf.transpose(self.Y),self.beta), self.Kernel_Prediction) return tf.sign(kernel_out - tf.reduce_mean(kernel_out)) @lazy_property def Accuracy(self): return tf.reduce_mean(tf.cast(tf.equal(tf.squeeze(self.Prediction), tf.squeeze(self.Y)), tf.float32)) def fit(self, trainX, trainY, gamma= 50.): self.sess = tf.InteractiveSession() self.__Preprocessing(trainX) self.gamma = tf.constant(value= -gamma, dtype=tf.float32) #self.optimizer = tf.train.ProximalGradientDescentOptimizer(self.learning_rate).minimize(self.Cost) self.optimizer = tf.train.AdamOptimizer(self.learning_rate).minimize(self.Cost) self.sess.run(tf.global_variables_initializer()) if self.training_epoch is not None: for ep in range(self.training_epoch): self.sess.run(self.optimizer, feed_dict={self.X:trainX, self.Y:trainY}) if ep % self.display_step== 0: loss, acc = self.sess.run([self.Cost, self.Accuracy], feed_dict={self.X:trainX, self.Y:trainY, self.test:trainX}) print ('epoch=',ep,'loss= ',loss, 'accuracy= ', acc) elif self.training_epoch is None: acc = 0.1 ep = 0 while (acc< 1.- self.error): acc,_ = self.sess.run([self.Accurac, self.optimizer], feed_dict={self.X:trainX, self.Y:trainY, self.test:trainX}) ep += 1 if ep % self.display_step== 0: loss = self.sess.run(self.Cost, feed_dict={self.X:trainX, self.Y:trainY}) print ('epoch=',ep,'loss= ',loss, 'accuracy= ', acc) def pred(self, test): output = self.sess.run(self.Prediction, feed_dict={self.X:trainX, self.Y:trainY, self.test:trainX}) return output
想看更多源代码的小伙伴们可以戳这个链接: https://uqer.io/community/share/58a147dfc1e3cc00567fde4d