机器怎样学习:使用概率统计的视角

视角的转换:预测取值还是预测分布

泛泛而论,机器学习就是给定一个输入 x,预测一个输出值 y,或者说,逼近目标函数 y = f(x)。回归模型接受输入 x,线性地预测一个输出 y。图像分类器将每个输入的图像分到一个类别中。

如果说上面的论述你全盘接受了的话,那么你本应该提出若干补充和质疑的。卷积神经网络的图像分类器,如果有 C 类可供选择,其输出层实际上是一排共 C 个 logits,代表图片属于每一类的概率的对数。回归模型虽然输出的确实是一个数,但它也可以被看成给定输入 x 时变量 y 服从的分布:P(y|x) = Gaussian(ax+b, sigma^2)。

甚至 “给定一个输入,预测一个输出” 这个模板也可以被打破:通过观测未知分布的变量 X 的取值 X1, X2, …, Xm, 得出一个概率模型,从而对概率密度函数求值,或者是采样重建 X。我能想到的最简单的例子(例子A):未知分布是服从二项分布的随机变量 X,观察到 10 个样本 0 0 1 1 0 0 0 1 0 0,我们可以建立如下模型:“P(X=0)=0.7, P(X=1)=0.3”,乃至从这个重建的分布里采样:1 0 0 0 1 0 …

再举一个更实用的例子:比如说 X = (p,q,r) 是自然发生的、连续 3 个汉字组成的输入空间,假若我们能够建模 P(X),那么通过对 P(r|p,q) = P(p,q,r) / P(p,q) 求值,我们就得到了下一字预测模型,可以辅助拼音输入法。

这就是机器学习的概率统计视角:机器学习通过样本重建分布,从而预测。

【2024年10月9日更新】不适用本视角的场景:利用语言模型将文档编码为稠密向量的 Dual-Encoder 模型,它希望问询和答案文档的表示更接近(余弦相似意义上)。这就属于表示学习(representation learning)的范畴了。

如何衡量重建分布的好坏

KL 散度

我们重建的分布 (下面记作 P_model) 在多大程度上还原了真实分布 (下面记作 P_data)?我们可以通过 KL 散度作为指标:

D_KL(P_data || P_model)
= E_{X~P_data} [log(P_data(X)/P_model(X))]
= E_{X~P_data}log(1/P_model(X)) - E_{X~P_data} log(1/P_data(X))
= Cross Entropy - Entropy

这个指标是非负的,在重建的分布等于真实分布时等于 0,而偏离真实分布越远,指标就越大。将 log 里面的除法拆开,第一项被称为“交叉熵”,而第二项恰好就是真实分布的信息熵,其含义为真实分布本身蕴含的不确定性。KL 散度等于交叉熵减去真实分布的熵。

/|\
 |    ----+---+
 |       /|\   \
 |D_KL -> |     +----- Cross Entropy
 |       \|/
 |    ----+----------- Entropy
0+---------------------> Choice of Model

当真实分布的形式已知的时候,我们可以直接计算指标中的各项。比如上一节的例子A里,假设真实分布是“P(X=0)=0.6, P(X=1)=0.4”,那么真实分布的信息熵就是 0.6*log(1/0.6) + 0.4*log(1/0.4) = 0.673;而我们建立的模型是“P(X=0)=0.7, P(X=1)=0.3”,因此交叉熵是 0.6*log(1/0.7) + 0.4*log(1/0.3) = 0.696,KL 散度是 0.696-0.673 = 0.023。

建立模型的我们,可以控制 P_model,但无法控制 P_data。我们希望 KL 散度越小越好,而 KL 散度取决于我们可以改变的交叉熵和我们控制范围之外的真实分布的熵。交叉熵最小的模型,就是 KL 散度最小的模型,就是最接近真实分布的模型。

真实分布:不能求值,但能采样

实际的机器学习问题中,我们不知道真实分布,但可以从真实分布中采样(收集数据)。这种情况下,无法对 P_data 求值的我们没有显然、容易的办法估算真实分布的熵或 KL 散度。但是,交叉熵恰好可以使用(从 P_data 重要性采样的)蒙特卡洛积分估算:

Cross Entropy
= E_{X~P_data}log(1/P_model(X))
= \Int_{X in Omega} P_data(X)*log(1/P_model(X))
≈ (1/m) \Sigma_{i=1}^m (P_data(X_i)*log(1/P_model(X_i)))/P_data(X_i)
= (1/m)\Sigma_{i=1}^m log(1/P_model(X_i))
= Cross Entropy Loss
X1, X2, ..., Xm are sampled from the distribution P_data

可以看到,表达式中已经没有了我们未知的 P_data 项,这就是交叉熵损失函数 (Cross Entropy Loss)。只要样本 X1,…,Xm 没有被用于改变 P_model (换言之,在测试集中),那么交叉熵损失就是交叉熵的无偏、一致估计。

条件 KL 散度

在实际的机器学习问题中,模型的架构往往是给定输入 X ,估算 Y 的条件分布 P(Y|X),例如给定一张图片 X,估算其类别的条件分布。模型无法估计,也不关心输入的分布 P(X)。然而,真实分布仍然是一个联合分布 P(X,Y)。针对这种情况,我们可以使用 “条件 KL 散度”:

Conditional KL Divergence
= E_{(X,Y)~P_data} [log(P_data(Y|X)/P_model(Y|X))]
= E_{(X,Y)~P_data} [1/P_model(Y|X)] - E_{(X,Y)~P_data} [1/P_data(Y|X)]
= Contitional Cross Entropy - Conditional Entropy

仍然把表达式拆成两项,前一项不妨称为“条件交叉熵”,而后一项恰好是真实分布的基于X条件的Y的信息熵。“条件 KL 散度”衡量了我们重建的条件分布有多接近真实的条件分布。

想要低的 KL 散度,该怎么办?

非参数化方法

答:通过观测到的样本,直接应用若干规则,上手构建一个好的模型。

最直截了当的办法就是密度估计。事实上,在例子A里面,我们已经用到了离散的密度估计:我们数了数10个样本里有7个0,3个1,就大胆宣称“P(X=0)=0.7, P(X=1)=0.3”。样本是有限的,为了避免出现某一类不出现导致概率为0,KL 散度变成无穷的情况,还是在频率的基础上做一些调整为好。

连续的情况下,我们可以使用 Kernel Density Estimation (KDE) 的办法给出模型概率(P_model):等于以各个样本为中心的正态分布的混合。正态分布的 sigma 如何确定?抱歉,这是一个需要调整的超参数。只需要估计条件概率?没问题,只要用 KDE 估计 P(X,Y),然后应用 P(Y|X) = P(X,Y)/P(X) 就可以了,只不过我们顺带着也把不需要的 P(X) 估计了,有点浪费(见演示1)。

更多非参数化方法的例子包括 KNN、决策树等等。这些方法都没有对 P_model 的形式作任何参数化的假设,这导致:一方面,当见到的样本数足够多,P_model 似乎可以无限接近 P_data;另一方面,需要大量的样本才能准确,在输入空间维数爆炸的情况下,这些方法不太可用。

参数化方法

答:先用固定数目的参数以一定形式构建 P_model,然后调整参数最小化某个损失函数(实际上执行最大似然估计)。

【2024年10月9日更新】很多生成式模型是不会显式输出 P_model 的(X 的定义域不是若干类别而是高维空间)。Boltzmann 机好歹有一个能量方程,GAN 似乎没有建模 P_model。

假设损失函数是 Cross Entropy Loss。在已见样本上最小化交叉熵的估计(也就是训练集上的 Cross Entropy Loss),为什么能够达到降低真正的交叉熵(用未见样本上的 Cross Entropy Loss 来估计)的目标?对我而言,这个问题并不显然。

解决这个疑惑的关键在于意识到 Cross Entropy Loss 是似然函数的负对数;最小化 Cross Entropy Loss 就是最大化似然函数,也就是说,给出参数的最大似然估计 (Maximum Likelihood Estimation, MLE)

-log(Likelihood)
= -log(\Prod_{i=1}^m P_model(X_i))
= \Sigma_{i=1}^m log(1/P_model(X_i))
= Cross Entropy Loss

最大似然估计是统计学中研究相当丰富的参数估计方法,具有一致性。假如某个参数能够使得 P_model 完全吻合真实的分布,那么随着样本数的增多,最大似然估计给出的参数无限接近真实的参数,也就是说,P_model 会无限接近 P_data(见演示2)。

最大似然估计更好的性质在于,即使我们的模型不等于真实分布(称这种情形为 misspecified model),最大似然估计给出的参数,必然能够使得 P_model 的 KL 散度最小(演示3)。

最大似然估计的启示是,选定模型的条件下,优化训练集上的交叉熵损失能够产生最好的模型(嗯,在样本数量无限的理想情况下)。

不同视角下的模型泛化

模型作为函数拟合器的视角:决定性框架

Machine Learning 一书的第二章描述了概念学习 (Concept Learning) 这个早在 1967 年就被提出的任务。该章节所举的例子大多采用了决定性(Deterministic)的分析框架。我们想要的是学习一个决定性的函数,而概念上存在一个我们事先不知道的、正确的目标函数。

理所当然地,我们看到的全部样本都是完美服从目标函数的。有(天气晴朗、风很大、浪很急,EnjoySport=True)这样一个样本,就绝不会在后来又出现一个(天气晴朗、风很大、浪很急,EnjoySport=False)的样本。我们可以在样本上很自然地定义一个模型函数和目标函数的误差。

针对此类决定性的学习器为什么可以泛化(“为什么模型没有死记硬背全部训练数据了事?”、“训练集上的误差小于测试集上的误差,但测试集上的误差到底几何?”),适用的解释是归纳学习假设:

Any hypothesis found to approximate the target function well over a sufficiently large set of training examples will also approximate the target function well over other unobserved examples.

翻译:在足够大的训练集上良好逼近目标函数的假设,在未见样本上也能良好逼近目标函数。

这是一个通用的、非定量的(多好才叫 approximate well?)、假设(并没有依据)。

针对概念学习的定量理论(2024年7月7日更新)

针对概念学习,也有一套定量理论:计算学习理论(Computational Learning Theory),涉及VC维、PAC(可能近似正确学习)等。概念学习并非和概率统计绝缘,虽然目标函数是确定性的,但是样本的产生是服从一个分布 D 的。首先定义误差:

error_D(h) = P_{x ~ D}[c(x) != h(x)]

对于有限假设空间中的一致学习器(Consistent Learner,训练集上误差为0),不难证明如下结论:

任意给定一个假设 h,假若其误差大于 eps,它是一致学习器的概率不超过

P[Consistent(h,S)]
= (1-error_D(h))^m
<= (1-eps)^m
where S is the training set {x1, ..., xm}, xi ~ D

因此,存在实际误差大于 eps 的一致学习器 h 的概率不超过

|H|*(1-eps)^m <= |H|*exp{-eps*m}

对于有限假设空间中的不可知学习器(注:指允许训练集上误差大于 0),任意给定假设 h,依据Hoeffding边界,实际误差大于训练集误差加上 eps 的假设的概率满足

P[error_D(h)>error_S(h)+eps]
= P[m*error_S(h)<m*error_D(h)-m*eps]
= P[X1+X2+...+Xm < E[X1+X2+...+Xm]-m*eps] (Hoeffding's inequality)
<= exp^{2*(m*eps)**2/m}
= exp^{-2*m*eps**2}
Here, let Xi denote 1 if h(xi)==c(xi) on the ith training sample xi else 0

这里之所以用 Hoeffding’s Inequality 而非 Central Limit Theorem 估计偏离的概率,是因为 1) Hoeffding 对 m 的大小没有要求,而 CLT 是 m 趋于无穷时的一个近似;2)Hoeffding 得到的结果是一个简单的指数形式,方便后续运算,而 CLT 得到的结果用高斯累积分布函数的形式表示,难以进行后续运算。

因此,存在泛化误差(误差减去训练集误差)大于 eps 的假设 h 的概率不超过

|H|*exp^{-2*m*eps**2}

这显然是一个关于泛化能力的保证:只要训练样本数相对于假设空间足够大,那么我们就以可以任意好的概率和精度泛化。

最后,对于无限维的假设空间,类似的保证也存在,可能近似正确学习需要的样本数正比于假设空间的 VC 维度。然而,对于决策树、神经网络之类可以打散任何样本空间中的集合的假设空间,其 VC 维度是无穷的,乍一看,这套定量理论无法对其给出泛化能力的任何保证。

上述观点被 Ilya 在他的 Talk 中一针见血地反驳:实际代码实现的决策树、神经网络等等模型都使用有限精度的浮点数,因此模型参数的组合种类数(假设空间)是有限的,不适用VC维理论,反而适用有限假设空间中的不可知学习器的结论。

作一个笔尖上的估计,假设模型参数占用 b 字节的内存,那么首先不难获知 |H| = 2^{8*b}。要让存在泛化误差大于 eps 的假设 h 的概率不超过 delta,需要多少训练样本?

|H|*eps^{-2*m*eps**2} < delta
<=>
m
> 1/(2*eps**2)*(ln(|H|)-ln(delta))
≈ 1/(2*eps**2)*ln(|H|) since |H| >> 1/delta
= 4*ln(2)*ln(b)/eps**2
≈ 2.77*ln(b)/eps**2

例如,要让泛化误差小于 5%,则需要 1109*b 个训练样本的条件下才能保证。可以看出,这是一个相当松的界限(按一个规格 2 输入单元,5 隐藏单元,2 输出单元,使用 float32 的三层 MLP 来计算,b=27*4=108,需要十万个样本),在实验中很少会被触碰到。(见演示5

参考:
张敏 机器学习概论 讲义
An Observation on Generalization by Ilya Sutskever at Simons Institute.

模型重建分布的视角:概率性框架

采用本文着力阐述的概率性框架,模型甚至没有“泛化”的过程。这里并没有试图通过“在训练集上表现良好”推断出“在测试集上表现良好”的跳跃/归纳过程。相反,在刚开始选定模型架构之后、初始化参数之前,关于模型的性能就有了一个一步到位的保证:只要保证参数使得训练集 Loss 达到最小,训练数据足够多,那么 KL 散度就会达到同样架构的模型中的最小。

Q:一般来说训练集上的 Loss 小于测试集上的 Loss,两者之间的关系具体是怎样的?
A:两者本身就不具有可比性。训练集上的损失函数,只是用来梯度更新模型参数的工具,我们不太关心。
Q:为什么模型没有死记硬背全部训练数据了事?
A:即使这发生了,也不会影响关于 KL 散度的保证。

将训练集上的 Cross ENTROPY 与实际值关联起来的定量理论(2024年7月8日更新)

与针对概念学习的定量理论类似,我们也可以给出训练集上交叉熵与实际交叉熵之间距离的上界。写出假设 h 对应的实际交叉熵、在单个样本上产生的交叉熵、和在训练集上的交叉熵

ce(h) = E_{X~P_data}log(1/P_model(X|h))
ce(X_i, h) = log(1/P_model(X_i|h))
ce_S(h) = (1/m)\Sigma_{i=1}^m log(1/P_model(X_i|h)) = average(ce(X_i,h))

h 在单个样本上产生的交叉熵,其期望就是ce(h),其方差未知,设为 sigma^2。其在训练集上的平均值就是训练集上的交叉熵损失。

根据中心极限定理,训练集上的交叉熵近似地服从正态分布:ce_S(h) ~ N(ce(h), sigma^2 / m)。

统计学上,对于随机变量 X~N(mu, sigma^2),其平均值偏离期望值的概率,有如下上界

Pr[X-mu \geq t] \leq exp{-t^2/(2*sigma^2)}

因此,

P[ce(h)>ce_S(h)+eps]
= P[ce_S(h) - ce(h) \leq -eps] (approximately)
<= exp^{-m*eps^2 / (2*sigma^2)}

因此,存在假设 h 使得其实际交叉熵比训练集上的交叉熵高出 eps 的概率不超过

|H|*exp^{-m*eps^2 / (2*sigma^2)}

模型的位宽/占用内存大小/自由度决定了 |H| 的指数部分,训练样本数量 m 也在上面式子的指数部分。这说明即使是重建分布,以KL散度或交叉熵来衡量成果的学习,只要训练样本数超过模型的字节数/自由度,那么训练集上的指标和实际指标之间的间距是有上界保证的。

2024年8月5日更新:任意给定模型/假设 h,它在从真实分布中采样得到的样本上产生的交叉熵,其方差并不好给出上界,这个上界可能与 h 有关。这可能意味着上面的推论中 sigma^2 不是一个良定义的量。
为了解决这个问题,可以“强迫”每个模型在重建分布时都叠加一层 uniform 分布,P_model <- (1-alpha) * P_model + alpha * P_uniform,这样依据问题的定义域,我们可以给出交叉熵的上界 U。例如,确定了语言模型任务中单词/token表长度为 V 后,P_uniform 就是 P_uniform(x) = 1/V,不难证明交叉熵的上界就是 U = log(V/alpha)。
这样,样本上的交叉熵被 [0, U] 的范围内,依据这个条件,我们可以使用 Hoeffding 边界完成推理(与上一节的推理步骤相同)

代码演示

kde-mle.py

演示1:demonstrate_kde() 函数:真实分布是两个高斯纷分布的混合分布,通过 Kernel Density Estimation 估计之。计算估计的 KL 散度和条件 KL 散度。可视化真实分布 vs 模型分布。

演示2:demonstrate_mle_consistency() 函数:真实分布是线性的,使用线性回归(MLE 的一种)估算真实分布的参数,估计得到的参数逼近真实的参数。

演示3:demonstrate_mle_misspecified() 函数:真实分布是两个高斯分布的混合分布,通过线性回归估计之。当样本数足够大,估计得到的参数使得条件 KL 散度在线性回归模型中达到最小(代码中只验证了局部极小)

xor-nn.py

演示4:test_nn_optimality() 函数(2024年6月24日更新):演示3中“得到的参数使得条件 KL 散度在同模型中达到最小“这个现象在神经网络中不会出现。如果对训练好的神经网络的参数作随机的微扰(这个微扰的方法有讲究),会有将近一半(可能多于一半,也可能少于一半)的扰动后网络 KL 散度低于原先的神经网络。这可能是因为神经网络的损失函数是非凸的、有大量局部极小点的,训练时使用的也是梯度下降的优化算法;与之相对的,线性回归模型的损失函数是凸的,直接可以求得解析解。

演示5:test_generalization_bound() 函数(2024年7月7日更新):真实分布是一个带噪声的异或函数,计算学习理论给出的关于泛化误差 eps 的上限估计很松,在实践中不会被触碰到。

附录:代码

kde-mle.py

import math
import random
from bisect import bisect
import matplotlib.pyplot as plt
import numpy as np

class Gaussian1D:
    def __init__(self, mu, sigma):
        self.dim = 1
        self.mu = mu
        self.sigma = sigma
    def evaluate(self, x):
        return math.exp(-(x-self.mu)**2/2/self.sigma**2) / math.sqrt(2*math.pi) / self.sigma
    def sample(self):
        return random.gauss(self.mu, self.sigma)
    def get_limits(self):
        return [self.mu-3*self.sigma], [self.mu+3*self.sigma]

class Gaussian2D:
    def __init__(self, ux, uy, sigma_x, sigma_y, rho):
        """Covariance Matrix: 
        [sigma_x**2,  rho*sigma_x*sigma_y,\\ rho*sigma_x*sigma_y, sigma_y**2]
        """
        self.dim = 2
        self.ux, self.uy = ux, uy
        self.sigma_x, self.sigma_y = sigma_x, sigma_y
        self.rho = rho
    def evaluate(self, xy):
        x,y = xy
        c = 2*math.pi*self.sigma_x*self.sigma_y*math.sqrt(1-self.rho**2)
        c_exp = -1/(2*(1-self.rho**2))
        x_prime = (x-self.ux)/self.sigma_x
        y_prime = (y-self.uy)/self.sigma_y
        return (1/c)*math.exp(c_exp*(x_prime**2+y_prime**2-2*self.rho*x_prime*y_prime))
    def sample(self):
        # X ~ N(mu, Sigma)
        # <=>
        # X = AZ + mu, Z ~ N(0,1) i.i.d.
        # where Sigma = AA^T. (Normal Random Vector Definition)
        # 
        # Here, A = [sigma_x, 0,\\ rho*sigma_y, sqrt(1-rho**2)*sigma_y]
        z_x = random.gauss(0,1)
        z_y = random.gauss(0,1)
        x = self.sigma_x*z_x
        y = self.rho*self.sigma_y*z_x + math.sqrt(1-self.rho**2)*self.sigma_y*z_y
        return (x+self.ux, y+self.uy)
    def marginal_over_x(self):
        # To obtain the marginal distribution over a subset of variables, just drop the irrelevant ones
        return Gaussian1D(self.ux, self.sigma_x)
    def condition_on_x(self, x_value):
        mu = self.uy + self.sigma_y/self.sigma_x*self.rho*(x_value-self.ux)
        sigma = math.sqrt(1-self.rho**2)*self.sigma_y
        return Gaussian1D(mu, sigma)
    def get_limits(self):
        return [self.ux-3*self.sigma_x, self.uy-3*self.sigma_y], \
                [self.ux+3*self.sigma_x, self.uy+3*self.sigma_y]

class UniformXGaussianY:
    def __init__(self, a, b, sigma, xmin, xmax):
        """ x ~ Uniform[xmin, xmax], y ~ Gaussian(ax+b, sigma)
        """
        self.dim = 2
        self.a, self.b = a, b
        self.sigma = sigma
        self.xmin, self.xmax = xmin, xmax
    def evaluate(self, xy):
        # P(x, y) = P(y|x) P(x)
        x,y = xy
        return 1/(self.xmax-self.xmin) * Gaussian1D(self.a*x+self.b, self.sigma).evaluate(y)
    def sample(self):
        x = random.uniform(self.xmin, self.xmax)
        y = random.gauss(self.a*x+self.b, self.sigma)
        return (x, y)
    def condition_on_x(self, x_value):
        return Gaussian1D(self.a*x_value+self.b, self.sigma)
    def get_limits(self):
        if self.a > 0:
            return [self.xmin, self.xmin*self.a+self.b-3*self.sigma], [self.xmax, self.xmax*self.a+self.b+3*self.sigma]
        else:
            return [self.xmin, self.xmax*self.a+self.b-3*self.sigma], [self.xmax, self.xmin*self.a+self.b+3*self.sigma]

class MixtureDistribution:
    def __init__(self, probs, distributions):
        self.probs = probs
        self.distributions = distributions
        self.dim = self.distributions[0].dim
        for d in self.distributions:
            assert self.dim==d.dim
        self.n = len(probs)
        self.cum = [0.0] * (self.n+1)
        for i in range(self.n):
            self.cum[i+1] = self.cum[i] + self.probs[i]
        assert abs(self.cum[-1]-1) < 1e-5
    def sample(self):
        id = bisect(self.cum, random.random()) - 1
        return self.distributions[id].sample()
    def evaluate(self, coord):
        p = 0.0
        for i in range(self.n):
            p += self.probs[i] * self.distributions[i].evaluate(coord)
        return p
    def marginal_over_x(self):
        return MixtureDistribution(self.probs, [d.marginal_over_x() for d in self.distributions])
    def condition_on_x(self, x_value):
        #   pdf(y|x=x0) 
        # = pdf(x=x0,y)/pdf(x=x0)
        # = (\Sigma_i c_i*pdf_i(x=x0,y)) / (\Sigma_i c_i*pdf_i(x=x0))
        # = (\Sigma_i c_i*pdf_i(x=x0)*pdf_i(y|x=x0)) / (\Sigma_i c_i*pdf_i(x=x0))
        # = (\Sigma_i w_i*pdf_i(y|x=x0))
        # where w_i = c_i*pdf_i(x=x0) / (\Sigma_j c_j*pdf_j(x=x0))
        weighted_probs = [0.0] * self.n
        for i in range(self.n):
            weighted_probs[i] = self.probs[i] * self.distributions[i].marginal_over_x().evaluate(x_value)
        denom = sum(weighted_probs)
        weighted_probs = [x/denom for x in weighted_probs]
        return MixtureDistribution(weighted_probs, [d.condition_on_x(x_value) for d in self.distributions])
    def get_limits(self):
        lower = [math.inf]*self.dim
        upper = [-math.inf]*self.dim
        for i in range(self.n):
            if self.probs[i] > 0:
                lower_d, upper_d = self.distributions[i].get_limits()
                lower = [min(l, l_d) for l, l_d in zip(lower, lower_d)]
                upper = [max(u, u_d) for u, u_d in zip(upper, upper_d)]
        return lower, upper

def kernel_density_estimation(samples, h):
    ds = [Gaussian2D(coord[0], coord[1], h, h, 0) for coord in samples]
    return MixtureDistribution([1/len(samples)]*len(samples), ds) # This representation is TOO costly...

def linear_regression_estimation(samples, xmin, xmax):
    """
    (Conditional) Negative log likelihood = -1/m*Sigma_{i=1}^m log P_model(y_i|x_i) =
    1/(2*sigma**2) * (\Sigma_{i=1}^m (y_i-a*x_i-b)**2)/m - log(1/sqrt(2*pi)/sigma)
    partial L / partial a = 0 <=> \bar{y} - a\bar{x} - b = 0
    partial L / partial b = 0 <=> \bar{xy} - a\bar{x^2} - b\bar{x} = 0
    a = (\bar{xy} - \bar{x}\bar{y}) / (\bar{x^2} - \bar{x}^2)
    b = \bar{y} - a\bar{x}
    partial L / partial sigma = -(\Sigma_{i=1}^m (y_i-a*x_i-b)**2/m)/sigma**3 + 1/sigma = 0
    => sigma**2 = \Sigma_{i=1}^m (y_i-a*x_i-b)**2/m
    """
    m = len(samples)
    xs, ys = zip(*samples)
    x_bar = sum(xs) / m
    y_bar = sum(ys) / m
    xy_bar = sum([x*y for x,y in samples]) / m
    xx_bar = sum([x*x for x,_ in samples]) / m
    a = (xy_bar - x_bar*y_bar) / (xx_bar - x_bar**2)
    b = y_bar - a*x_bar
    sigma = math.sqrt(sum([(y-a*x-b)**2 for x,y in samples])/m)
    return UniformXGaussianY(a,b,sigma,xmin,xmax)

def kl_divergence(d_data, d_model, conditional=False, debug=False):
    """For Conditional KL Divergence:
    KL(P_data(y|x) || P_model(y|x)) = E_{(x,y)~P_data} log{P_data(y|x)/P_model(y|x)}
    = Conditional Cross Entropy - Conditional Entropy
    """
    lower, upper = d_data.get_limits()
    n_steps = 50
    xs = np.linspace(lower[0], upper[0], n_steps)
    ys = np.linspace(lower[1], upper[1], n_steps)
    xs, ys = np.meshgrid(xs, ys)
    coords = np.stack([xs.ravel(), ys.ravel()], axis=-1)
    dxy = (upper[0]-lower[0])*(upper[1]-lower[1])/n_steps**2
    kl = 0.0
    p_data_sum, p_model_sum = 0.0, 0.0
    for coord in coords:
        p_data = d_data.evaluate(coord)
        p_data_sum += p_data * dxy
        if conditional:
            p_data_condition = d_data.condition_on_x(coord[0]).evaluate(coord[1])
            p_model_condition = d_model.condition_on_x(coord[0]).evaluate(coord[1])
            kl += p_data * dxy * math.log(p_data_condition / p_model_condition)
        else:
            p_model = d_model.evaluate(coord)
            p_model_sum += p_model * dxy
            kl += p_data * dxy * math.log(p_data / p_model)
    if debug:
        print("Integral of pdf of data distribution: %.4f, of model distribution: %.4f"%(p_data_sum, p_model_sum))
    return kl

def visualize_2d_distributions(d_data, d_model):
    lower, upper = d_data.get_limits()
    n_steps = 50
    xs = np.linspace(lower[0], upper[0], n_steps)
    ys = np.linspace(lower[1], upper[1], n_steps)
    xs, ys = np.meshgrid(xs, ys)
    coords = np.stack([xs.ravel(), ys.ravel()], axis=-1)
    
    p_data = np.array([d_data.evaluate(coord) for coord in coords]).reshape(n_steps, n_steps)
    p_model = np.array([d_model.evaluate(coord) for coord in coords]).reshape(n_steps, n_steps)
    
    fig, axes = plt.subplots(1, 2, subplot_kw={"projection": "3d"})
    ax_data, ax_model = axes
    ax_data.plot_surface(xs, ys, p_data, color='r')
    ax_data.set_title("PDF of Ground Truth")
    ax_model.plot_surface(xs, ys, p_model, color='b')
    ax_model.set_title("PDF of / output by our Model")
    
    for ax in axes:
        ax.set_xlabel("x")
        ax.set_ylabel("y")
    plt.show()

def visualize_1d_distributions(d_data, d_model, xmin, xmax, title):
    n_eval = 100
    xs = np.linspace(xmin, xmax, n_eval)
    p_datas = [d_data.evaluate(x) for x in xs]
    p_models = [d_model.evaluate(x) for x in xs]
    plt.plot(xs, p_datas, c='r')
    plt.plot(xs, p_models, c='b')
    plt.gca().set_xlim([xmin, xmax])
    plt.gca().title.set_text(title)

def visualize_conditional_distributions(d_data, d_model):
    x_divs = 9
    for div_id in range(x_divs):
        x_value = -0.7 + div_id * 0.2
        plt.subplot(3,3,div_id+1)
        visualize_1d_distributions(d_data.condition_on_x(x_value), d_model.condition_on_x(x_value), -1.5, 1.5, "x=%.2f"%x_value)
    plt.suptitle("Conditional Probability Distribution P(Y|X)")
    plt.show()

def visualize_samples(samples):
    xs, ys = zip(*samples)
    plt.scatter(xs, ys, marker='x', s=2)
    plt.gca().set_aspect('equal', adjustable='box')
    plt.title("Samples Drawn from Ground Truth Distribution")
    plt.show()

def demonstrate_kde():
    d1 = Gaussian2D(-0.1,0.3,0.5,0.1,0.8)
    d2 = Gaussian2D(0.1,-0.3,0.4,0.2,-0.8)
    d_data = MixtureDistribution([0.4, 0.6], [d1, d2])
    n_samples = 400
    samples = [d_data.sample() for _ in range(n_samples)]
    visualize_samples(samples)
    d_model = kernel_density_estimation(samples, 0.05)
    kl = kl_divergence(d_data, d_model, debug=True)
    print("kl divergence: %.4f" % kl)
    klc = kl_divergence(d_data, d_model, conditional=True)
    print("conditional kl divergence: %.4f"%klc)
    visualize_2d_distributions(d_data, d_model)
    visualize_conditional_distributions(d_data, d_model)

def demonstrate_mle_consistency():
    """Demonstrates that the the maximum likelihood estimator is consistent:
    Given enough observations, theta approaches the true theta, kl divergence converges to 0"""
    d_data = UniformXGaussianY(1, -0.5, 0.4, 0, 1)
    samples = [d_data.sample() for _ in range(100)]
    visualize_samples(samples)
    n_repeat = 100
    for n_samples in [10, 30, 100, 300, 1000, 3000]:
        klcs = list()
        for i_repeat in range(n_repeat):
            samples = [d_data.sample() for _ in range(n_samples)]
            d_model = linear_regression_estimation(samples, 0, 1)
            klc = kl_divergence(d_data, d_model, conditional=True)
            klcs.append(klc)
            if i_repeat == 0:
                print("samples = %d, example a = %.4f, b = %.4f, sigma = %.4f" % (n_samples, d_model.a, d_model.b, d_model.sigma))
        mean = sum(klcs) / n_repeat
        sd = math.sqrt(sum([(klc - mean)**2 for klc in klcs]) / n_repeat) # Uncorrected std deviation
        se_mean = sd / math.sqrt(n_samples) # Standard error of the mean
        print("samples = %d, conditional kl divergence: mean %.4f pm %.4f, sd %.4f" % (n_samples, mean, se_mean, sd))

def demonstrate_mle_misspecified():
    """Demonstrates that even if the model is misspecified, given enough observations,
    the Maximum Likelihood Estimator gives the closest distribution to the data distribution in terms of KL Divergence"""
    d1 = Gaussian2D(0.3,0.3,0.5,0.1,0.8)
    d2 = Gaussian2D(-0.3,-0.3,0.3,0.4,0.8)
    d_data = MixtureDistribution([0.4, 0.6], [d1, d2])
    samples = [d_data.sample() for _ in range(500)]
    visualize_samples(samples)
    samples = [d_data.sample() for _ in range(100000)] # We need enough samples for this trick to pull off
    d_model = linear_regression_estimation(samples, 0, 0)
    klc_optim = kl_divergence(d_data, d_model, conditional=True)
    a,b,sigma = d_model.a, d_model.b, d_model.sigma
    print("a = %.4f, b = %.4f, sigma = %.4f" % (a,b,sigma))
    n_steps = 5
    klcs = np.zeros((n_steps,n_steps,n_steps))
    for index in np.ndindex((n_steps,n_steps,n_steps)):
        coords = np.array(index)*0.05 + 0.9
        d_model = UniformXGaussianY(a*coords[0], b*coords[1], sigma*coords[2], 0, 1)
        klc = kl_divergence(d_data, d_model, conditional=True)
        klcs[index] = klc
        if coords[0] != 1 or coords[1] != 1 or coords[2] != 1:
            assert klc > klc_optim
    print(klcs)
    print("No Other Linear Regression Model knows the Data Distribution better than me!")

demonstrate_kde()
demonstrate_mle_consistency()
demonstrate_mle_misspecified()

xor-nn.py

import math
import random
import time
from bisect import bisect
from copy import deepcopy
import matplotlib.pyplot as plt
import numpy as np

class MultinomialDistribution:
    def __init__(self, probs):
        self.probs = probs

class Gaussian2D:
    def __init__(self, ux, uy, sigma_x, sigma_y, rho):
        """Covariance Matrix: 
        [sigma_x**2,  rho*sigma_x*sigma_y,\\ rho*sigma_x*sigma_y, sigma_y**2]
        """
        self.dim = 2
        self.ux, self.uy = ux, uy
        self.sigma_x, self.sigma_y = sigma_x, sigma_y
        self.rho = rho
    def evaluate(self, xy):
        x,y = xy
        c = 2*math.pi*self.sigma_x*self.sigma_y*math.sqrt(1-self.rho**2)
        c_exp = -1/(2*(1-self.rho**2))
        x_prime = (x-self.ux)/self.sigma_x
        y_prime = (y-self.uy)/self.sigma_y
        return (1/c)*math.exp(c_exp*(x_prime**2+y_prime**2-2*self.rho*x_prime*y_prime))
    def sample(self):
        z_x = random.gauss(0,1)
        z_y = random.gauss(0,1)
        x = self.sigma_x*z_x
        y = self.rho*self.sigma_y*z_x + math.sqrt(1-self.rho**2)*self.sigma_y*z_y
        return (x+self.ux, y+self.uy)
    def get_limits(self):
        return [self.ux-3*self.sigma_x, self.uy-3*self.sigma_y], \
                [self.ux+3*self.sigma_x, self.uy+3*self.sigma_y]

def probs_to_cum(probs):
    n = len(probs)
    cum = [0.0] * (n+1)
    for i in range(n):
            cum[i+1] = cum[i] + probs[i]
    assert abs(cum[-1]-1) < 1e-5
    return cum

def combine_limits_2d(limits):
    lower = [math.inf]*2
    upper = [-math.inf]*2
    for lower_i,upper_i in limits:
        lower = [min(l, l_d) for l, l_d in zip(lower, lower_i)]
        upper = [max(u, u_d) for u, u_d in zip(upper, upper_i)]
    return lower, upper

class Mixture2D:
    def __init__(self, probs, distributions):
        self.probs = probs
        self.distributions = distributions
        self.n = len(probs)
        self.cum = probs_to_cum(probs)
    def sample(self):
        id = bisect(self.cum, random.random()) - 1
        return self.distributions[id].sample()
    def evaluate(self, xy):
        p = 0.0
        for i in range(self.n):
            p += self.probs[i] * self.distributions[i].evaluate(xy)
        return p
    def get_limits(self):
        limits = [d.get_limits() for d in self.distributions]
        return combine_limits_2d(limits)

class Plane2Class:
    def __init__(self, probs, distributions):
        """probs: the probability for each class
        distbs: the probability distribution on the XY plane for each class
        """
        self.probs = probs
        self.distributions = distributions
        self.n = len(probs)
        self.cum = probs_to_cum(probs)
    def sample(self):
        class_id = bisect(self.cum, random.random()) - 1
        x,y = self.distributions[class_id].sample()
        return (x,y,class_id)
    def evaluate(self, xyc):
        # P(x,y,c) = P(x,y|c) * P(c)
        x,y,c = xyc
        return self.probs[c]*self.distributions[c].evaluate((x,y))
    def condition_on_xy(self, xy):
        # P(c|x,y) = P(x,y,c) / P(x,y)
        probs = [0.0]*self.n
        for c in range(self.n):
            probs[c] = self.probs[c]*self.distributions[c].evaluate(xy)
        p_xy = sum(probs)
        probs = [p/p_xy for p in probs]
        return MultinomialDistribution(probs)
    def get_limits(self):
        limits = [d.get_limits() for d in self.distributions]
        return combine_limits_2d(limits)

def visualize_samples(samples):
    xs, ys, cs = zip(*samples)
    plt.scatter(xs, ys, c=cs, marker='x', s=2)
    plt.gca().set_aspect('equal', adjustable='box')
    plt.title("Samples Drawn from Ground Truth Distribution")
    plt.show()

def softmax(logits):
    probs = np.exp(logits)
    return probs / np.sum(probs)

class Plane2ClassNN:
    def __init__(self, n_hidden, n_class, init_seed=0, float16=False):
        np.random.seed(init_seed)
        self.n_hidden, self.n_class = n_hidden, n_class
        t = np.float16 if float16 else np.float32
        self.w1 = 2*np.random.rand(2,n_hidden).astype(t) - 1
        self.b1 = 2*np.random.rand(n_hidden).astype(t) - 1
        self.w2 = 2*np.random.rand(n_hidden,n_class).astype(t) - 1
        self.b2 = 2*np.random.rand(n_class).astype(t) - 1
    def condition_on_xy(self, xy):
        xy = np.array(xy)
        h = np.dot(xy,self.w1)+self.b1
        h = np.maximum(h, 0) # ReLU Activation
        logits = np.dot(h,self.w2)+self.b2
        return MultinomialDistribution(softmax(logits))
    def mle_using_gd(self, samples, lr):
        """Orders of magnitude slower than mle_using_gd_batched, each sample involves overhead for function
        calls, the overhead accumulates because we process each sample individually. Plus, it is not 
        vectorized, which benefits from optimized low-level libraries"""
        m = len(samples)
        nll = 0.0 # Negative Log Likelihood
        p_b2 = np.zeros(self.n_class)
        p_w2 = np.zeros((self.n_hidden,self.n_class))
        p_b1 = np.zeros(self.n_hidden)
        p_w1 = np.zeros((2,self.n_hidden))
        for x,y,c in samples:
            # Forward
            xy = np.array([x,y])
            h = np.dot(xy,self.w1)+self.b1
            h = np.maximum(h, 0)
            logits = np.dot(h,self.w2)+self.b2
            probs = softmax(logits)
            nll += -math.log(probs[c])
            # Backwards
            p_logits = np.copy(probs)
            p_logits[c] -= 1.0
            p_b2 += p_logits
            p_w2 += np.outer(h,p_logits)
            p_h = np.dot(p_logits,self.w2.T)
            p_h = p_h * (h > 0)
            p_b1 += p_h
            p_w1 += np.outer(xy,p_h)
        self.w1 -= p_w1 * lr / m
        self.b1 -= p_b1 * lr / m
        self.w2 -= p_w2 * lr / m
        self.b2 -= p_b2 * lr / m
        return nll
    def mle_using_gd_batched(self, samples, lr):
        xs,ys,cs = zip(*samples)
        # Forward
        xy = np.array([xs,ys]).T
        h = np.dot(xy,self.w1)+self.b1 # b1 is broadcast
        h = np.maximum(h,0)
        logits = np.dot(h,self.w2)+self.b2
        probs = np.exp(logits)
        probs = probs / probs.sum(axis=1,keepdims=True) # The first dimension is batch
        logprobs = np.log(probs)
        mask = np.eye(self.n_class)[list(cs)]
        nll = -np.multiply(logprobs,mask).sum()
        # Backward
        p_logits = np.copy(probs) - mask
        p_b2 = np.mean(p_logits,axis=0)
        p_w2 = np.mean(np.einsum('ij,ik->ijk',h,p_logits), axis=0)
        p_h = np.dot(p_logits,self.w2.T)
        p_h = p_h * (h>0)
        p_b1 = np.mean(p_h,axis=0)
        p_w1 = np.mean(np.einsum('ij,ik->ijk',xy,p_h), axis=0)
        # update
        self.w1 -= p_w1 * lr
        self.b1 -= p_b1 * lr
        self.w2 -= p_w2 * lr
        self.b2 -= p_b2 * lr
        return nll

def train_nn(d_data, n_samples, nll_alpha, seed, epoch_cutoff=500000, nll_check_every=1000, resample=False):
    random.seed(seed)
    samples = [d_data.sample() for _ in range(n_samples)]
    d_model = Plane2ClassNN(5, 2, init_seed=seed)
    prev_nll = math.inf
    epoch_cnt = 0
    lr_sequence = [1e-3, 5e-4, 2e-4, 1e-4, 5e-5, 2e-5, 1e-5, 5e-6, 2e-6, 1e-6]
    lr_id = 0
    start = time.time()
    for epoch in range(epoch_cutoff):
        epoch_cnt = epoch
        if resample:
            samples = [d_data.sample() for _ in range(n_samples)]
        nll = d_model.mle_using_gd_batched(samples, lr_sequence[lr_id])
        if epoch % nll_check_every == 0:
            if nll > prev_nll * nll_alpha:
                lr_id += 1
                if lr_id >= len(lr_sequence):
                    break
            else:
                prev_nll = nll
    end = time.time()
    return d_model, nll, epoch_cnt, end-start

def conditional_kl_divergence(d_data, d_model):
    """KL(P_data(c|x,y) || P_model(c|x,y))
    = E_{(x,y,c)~P_data} log{P_data(c|x,y)/P_model(c|x,y)}
    = Integrate_{x,y,c} P_data(x,y,c) log{...}
    = Conditional Cross Entropy - Conditional Entropy
    """
    lower, upper = d_data.get_limits()
    n_steps = 50
    xs = np.linspace(lower[0], upper[0], n_steps)
    ys = np.linspace(lower[1], upper[1], n_steps)
    xs, ys = np.meshgrid(xs, ys)
    coords = np.stack([xs.ravel(), ys.ravel()], axis=-1)
    dxy = (upper[0]-lower[0])*(upper[1]-lower[1])/n_steps**2
    cross_entropy, conditional_entropy = 0.0, 0.0
    p_data_sum = 0.0
    for x,y in coords:
        d_data_c_on_xy = d_data.condition_on_xy((x,y))
        d_model_c_on_xy = d_model.condition_on_xy((x,y))
        for c in range(d_data.n):
            p_data = d_data.evaluate((x,y,c)) * dxy
            p_data_sum += p_data
            cross_entropy += p_data * math.log(1/d_model_c_on_xy.probs[c])
            conditional_entropy += p_data * math.log(1/d_data_c_on_xy.probs[c])
    return cross_entropy, conditional_entropy

def conditional_kl_divergence_mc(d_data, d_model, n_test):
    """Use Monte Carlo Integration with importance sampling to estimate cross entropy
    Essentially, we return the cross entropy on a newly sampled "testing set"
    In theory, should return the same value compared to conditional_kl_divergence()
    """
    test_samples = [d_data.sample() for _ in range(n_test)]
    cross_entropy = 0.0
    for x,y,c in test_samples:
        cross_entropy -= math.log(d_model.condition_on_xy((x,y)).probs[c])
    return cross_entropy/n_test

def visualize_2d_distributions(d_data, d_model):
    """Visualize the probability of the first class on every point of the plane"""
    lower, upper = d_data.get_limits()
    n_steps = 50
    xs = np.linspace(lower[0], upper[0], n_steps)
    ys = np.linspace(lower[1], upper[1], n_steps)
    xs, ys = np.meshgrid(xs, ys)
    coords = np.stack([xs.ravel(), ys.ravel()], axis=-1)
    
    p_data = np.array([d_data.condition_on_xy((x,y)).probs[0] for x,y in coords]).reshape(n_steps, n_steps)
    p_model = np.array([d_model.condition_on_xy((x,y)).probs[0] for x,y in coords]).reshape(n_steps, n_steps)
    
    fig, axes = plt.subplots(1, 2, subplot_kw={"projection": "3d"})
    ax_data, ax_model = axes
    ax_data.contourf(xs, ys, p_data, 100)
    ax_data.set_title("Conditional Probability for class=0, Ground Truth")
    ax_model.contourf(xs, ys, p_model, 100)
    ax_model.set_title("Conditional Probability for class=0, NN Model")
    
    for ax in axes:
        ax.set_xlabel("x")
        ax.set_ylabel("y")
    plt.show()

def perturb_param(w:np.array, eps:float):
    """w <- w + eps*norm(w)*v where v is sampled from a uniform Nd unit sphere"""
    # A direction from the Nd unit sphere can be sampled by generating a standard normal random vector
    # ( x_i i.i.d N(0,1) ) then normalizing the vector.
    # this is because the standard normal random vector is spherically symmetric:
    # p(X) = ... * exp(-1/2*(x-mu)^T Sigma^{-1} (x-mu)) = ... * exp(||x||_2^2)
    delta_w = np.random.standard_normal(size=w.shape)
    delta_w = delta_w / np.linalg.norm(delta_w)
    w += eps*np.linalg.norm(w)*delta_w

def optimality_test(d_data:Plane2Class, d_model:Plane2ClassNN, eps, n_var):
    """Test whether the trained model is better than random variations.
    Since the ReLU network is scale invariant (network is unchanged if we multiply the weights in one layer
    by 10 and dividing the next layer by 10), we scale the pertubation according to the parameter
    """
    cross = conditional_kl_divergence_mc(d_data, d_model, 10000)
    cross_mods = list()
    n_optimal = 0
    for _ in range(n_var):
        d_mod = deepcopy(d_model)
        for w in [d_mod.w1, d_mod.b1, d_mod.w2, d_mod.b2]:
            perturb_param(w,eps)
        cross_mod = conditional_kl_divergence_mc(d_data, d_mod, 10000)
        cross_mods.append(cross_mod)
        if cross_mod > cross:
            n_optimal += 1
    return cross, cross_mods, n_optimal/n_var

def printweights(d_model):
    print("w1:",d_model.w1)
    print("b1:",d_model.b1)
    print("w2:",d_model.w2)
    print("b2:",d_model.b2)

def demo_nn():
    """Demonstrate what the neural network has learned"""
    positive = Mixture2D([0.6, 0.4], [Gaussian2D(1,-1,.5,.5,0), Gaussian2D(-1,1,.7,.7,0)])
    negative = Mixture2D([0.3, 0.7], [Gaussian2D(1,1,.5,.5,0), Gaussian2D(-1,-1,.7,.7,0)])
    d_data = Plane2Class([0.4, 0.6], [positive, negative])
    samples = [d_data.sample() for _ in range(300)]
    visualize_samples(samples)
    d_model, nll, epoch_cnt, elapsed = train_nn(d_data, 100, 0.99, 0)
    cross, cond = conditional_kl_divergence(d_data, d_model)
    print("epoch %d, %.4f ms per epoch, nll %.4f, cross entropy %.4f, conditional entropy %.4f"%\
          (epoch_cnt, elapsed*1000/epoch_cnt , nll, cross, cond))
    cross_mc = conditional_kl_divergence_mc(d_data, d_model, 10000)
    print("cross entropy as measured using cross entropy loss on the test set %.4f"%cross_mc)
    printweights(d_model)
    visualize_2d_distributions(d_data, d_model)

def test_nn_optimality():
    positive = Mixture2D([0.6, 0.4], [Gaussian2D(1,-1,.5,.5,0), Gaussian2D(-1,1,.7,.7,0)])
    negative = Mixture2D([0.3, 0.7], [Gaussian2D(1,1,.5,.5,0), Gaussian2D(-1,-1,.7,.7,0)])
    d_data = Plane2Class([0.4, 0.6], [positive, negative])
    d_model, nll, epoch_cnt, elapsed = train_nn(d_data, 100, 0.99, 0)
    cross, cond = conditional_kl_divergence(d_data, d_model)
    print("epoch %d, %.4f ms per epoch, nll %.4f, cross entropy %.4f, conditional entropy %.4f"%\
          (epoch_cnt, elapsed*1000/epoch_cnt, nll, cross, cond))
    printweights(d_model)
    for eps in [1e-3, 2e-3, 5e-3, 1e-2, 2e-2, 5e-2, 1e-1, 2e-1, 5e-1, 1.0]:
        print("eps=%.4f"%eps)
        cross, _, optimal_ratio = optimality_test(d_data, d_model, eps, 50)
        print("Model is better than %.2f"%(optimal_ratio*100) +"% of random variations directions")

def test_generalization_bound():
    """
    According to PAC in computational learning theory,
    in concept learning which cares about accuracy Pr_{x~D}[h(x)==c(x)] instead of kL divergence,
    for any given hypothesis h:
    Pr[error_train(h) - error_test(h) > eps] <= exp{-2*m*eps**2}
    where m is the number of training samples.
    The probability that there exists a hypothesis that has generalization error greater than eps is less
    |H|*exp{-2*m*eps**2}
    To bound that probability to delta, we need
    m >= 1/(2*eps**2) * (ln|H| - ln(1/delta))
    \approx 1/(2*eps**2) * ln|H|
    = 8*ln2/(2*eps**2) * model_size_in_bytes
    For a ballpark figure, to achieve < 10% generalization error, we need 277*model_size_byte training samples
    """
    positive = Mixture2D([0.6, 0.4], [Gaussian2D(1,-1,.7,.7,0), Gaussian2D(-1,1,.5,.5,0)])
    negative = Mixture2D([0.3, 0.7], [Gaussian2D(1,1,.7,.7,0), Gaussian2D(-1,-1,.5,.5,0)])
    d_data = Plane2Class([0.4, 0.6], [positive, negative])
    n_train = 4000
    samples = [d_data.sample() for _ in range(n_train)]
    visualize_samples(samples)
    n_hidden = 5
    float16 = False
    # 5 hidden units and float16 is insufficient to learn xor
    # However, 5 hidden units and float32 can learn xor (low precision hinders sgd process?)
    d_model = Plane2ClassNN(n_hidden, 2, init_seed=0, float16=float16)
    train_acc = None
    for epoch in range(10000):
        nll = d_model.mle_using_gd_batched(samples, 1e-3)
        if epoch % 1000 == 0:
            # try to calculate "training error"
            n_correct = 0
            for x,y,c in samples:
                if d_model.condition_on_xy((x,y)).probs.argmax()==c:
                    n_correct += 1
            train_acc = n_correct/n_train
            print("nll %.4f, accuracy %.4f" % (nll, train_acc))
    n_test = 1000
    test_samples = [d_data.sample() for _ in range(n_test)]
    n_correct = 0
    for x,y,c in test_samples:
        if d_model.condition_on_xy((x,y)).probs.argmax()==c:
            n_correct += 1
    test_acc = n_correct/n_test
    print("train acc: %.4f, test acc: %.4f, generalization err: %.4f" % (train_acc, test_acc, train_acc-test_acc))
    # The approximated upper bound for eps
    model_size_bytes = (2*n_hidden+n_hidden+n_hidden*2+2)*(2 if float16 else 4)
    eps_guarantee = math.sqrt(4*math.log(2)*model_size_bytes/n_train)
    print("eps should be less than %.4f almost certainly" % eps_guarantee)

# demo_nn()
test_nn_optimality()
test_generalization_bound()

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *