import demjson
import math
import matplotlib.pyplot as plt
import numpy
import json
import random
import scipy
import sklearn
import string
import tensorflow as tf
from collections import defaultdict # Dictionaries that take a default for missing entries
from sklearn import linear_model
Data is available at http://cseweb.ucsd.edu/~jmcauley/pml/data/. Download and save to your own directory
dataDir = "/home/jmcauley/pml_data/"
def sigmoid(x):
return 1.0 / (1.0 + math.exp(-x))
X = numpy.arange(-7,7.1,0.1)
Y = [sigmoid(x) for x in X]
plt.plot(X, Y, color='black')
plt.plot([0,0],[-2,2], color = 'black', linewidth=0.5)
plt.plot([-7,7],[0.5,0.5], color = 'k', linewidth=0.5, linestyle='--')
plt.xlim(-7, 7)
plt.ylim(-0.1, 1.1)
plt.xticks([-5,0,5])
plt.xlabel("$x$")
plt.ylabel(r"$\sigma(x)$")
plt.title("Sigmoid function")
plt.show()
Read a small dataset of beer reviews
path = dataDir + "beer_50000.json"
f = open(path)
data = []
for l in f:
if 'user/gender' in l: # Discard users who didn't specify gender
d = eval(l) # demjson is more secure but much slower!
data.append(d)
f.close()
data[0]
Predict the user's gender from the length of their review
X = [[1, len(d['review/text'])] for d in data]
y = [d['user/gender'] == 'Female' for d in data]
Fit the model
mod = sklearn.linear_model.LogisticRegression()
mod.fit(X,y)
Calculate the accuracy of the model
predictions = mod.predict(X) # Binary vector of predictions
correct = predictions == y # Binary vector indicating which predictions were correct
sum(correct) / len(correct)
Accuracy seems surprisingly high! Check against the number of positive labels...
1 - (sum(y) / len(y))
Accuracy is identical to the proportion of "males" in the data. Confirm that the model is never predicting positive
sum(predictions)
Use the class_weight='balanced' option to implement the balanced classifier
mod = sklearn.linear_model.LogisticRegression(class_weight='balanced')
mod.fit(X,y)
predictions = mod.predict(X)
Compute the accuracy of the balanced model
correct = predictions == y
sum(correct) / len(correct)
TP = sum([(p and l) for (p,l) in zip(predictions, y)])
FP = sum([(p and not l) for (p,l) in zip(predictions, y)])
TN = sum([(not p and not l) for (p,l) in zip(predictions, y)])
FN = sum([(not p and l) for (p,l) in zip(predictions, y)])
print("TP = " + str(TP))
print("FP = " + str(FP))
print("TN = " + str(TN))
print("FN = " + str(FN))
Can rewrite the accuracy in terms of these metrics
(TP + TN) / (TP + FP + TN + FN)
TPR = TP / (TP + FN)
TNR = TN / (TN + FP)
TPR,TNR
BER = 1 - 1/2 * (TPR + TNR)
BER
precision = TP / (TP + FP)
recall = TP / (TP + FN)
precision, recall
F1 score
F1 = 2 * (precision*recall) / (precision + recall)
F1
path = dataDir + "beer_500.json"
f = open(path)
data = []
for l in f:
d = eval(l)
data.append(d)
f.close()
Randomly sort the data (so that train and test are iid)
random.seed(0)
random.shuffle(data)
Predict overall rating from ABV
X1 = [[1] for d in data] # Model *without* the feature
X2 = [[1, d['beer/ABV']] for d in data] # Model *with* the feature
y = [d['review/overall'] for d in data]
Fit the two models (with and without the feature)
model1 = sklearn.linear_model.LinearRegression(fit_intercept=False)
model1.fit(X1[:250], y[:250]) # Train on first half
residuals1 = model1.predict(X1[250:]) - y[250:] # Test on second half
model2 = sklearn.linear_model.LinearRegression(fit_intercept=False)
model2.fit(X2[:250], y[:250])
residuals2 = model2.predict(X2[250:]) - y[250:]
Residual sum of squares for both models
rss1 = sum([r**2 for r in residuals1])
rss2 = sum([r**2 for r in residuals2])
k1,k2 = 1,2 # Number of parameters of each model
n = len(residuals1) # Number of samples
F statistic (results may vary for different random splits)
F = ((rss1 - rss2) / (k2 - k1)) / (rss2 / (n-k2))
scipy.stats.f.cdf(F,k2-k1,n-k2)
Small dataset of fantasy reviews
path = dataDir + "fantasy_100.json"
f = open(path)
data = []
for l in f:
d = json.loads(l)
data.append(d)
f.close()
Predict rating from review length
ratings = [d['rating'] for d in data]
lengths = [len(d['review_text']) for d in data]
X = numpy.matrix([[1,l] for l in lengths])
y = numpy.matrix(ratings).T
First check the coefficients if we fit the model using sklearn
model = sklearn.linear_model.LinearRegression(fit_intercept=False)
model.fit(X, y)
theta = model.coef_
theta
Convert features and labels to tensorflow structures
X = tf.constant(X, dtype=tf.float32)
y = tf.constant(y, dtype=tf.float32)
Build tensorflow regression class
class regressionModel(tf.keras.Model):
def __init__(self, M, lamb):
super(regressionModel, self).__init__()
# Initialize weights to zero
self.theta = tf.Variable(tf.constant([0.0]*M, shape=[M,1], dtype=tf.float32))
self.lamb = lamb
# Prediction (for a matrix of instances)
def predict(self, X):
return tf.matmul(X, self.theta)
# Mean Squared Error
def MSE(self, X, y):
return tf.reduce_mean((tf.matmul(X, self.theta) - y)**2)
# Regularizer
def reg(self):
return self.lamb * tf.reduce_sum(self.theta**2)
# L1 regularizer
def reg1(self):
return self.lamb * tf.reduce_sum(tf.abs(self.theta))
# Loss
def call(self, X, y):
return self.MSE(X, y) + self.reg()
Initialize the model (lambda = 0)
optimizer = tf.keras.optimizers.Adam(0.1)
model = regressionModel(len(X[0]), 0)
Train for 1000 iterations of gradient descent (could implement more careful stopping criteria)
for iteration in range(1000):
with tf.GradientTape() as tape:
loss = model(X, y)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
Confirm that we get a similar result to what we got using sklearn
model.theta
Make a few predictions using the model
model.predict(X)[:10]
Predict whether rating is above 4 from length
X = numpy.matrix([[1,l*0.0001] for l in lengths]) # Rescale the lengths for conditioning
y_class = numpy.matrix([[r > 4 for r in ratings]]).T
Convert to tensorflow structures
X = tf.constant(X, dtype=tf.float32)
y_class = tf.constant(y_class, dtype=tf.float32)
Tensorflow classification class
class classificationModel(tf.keras.Model):
def __init__(self, M, lamb):
super(classificationModel, self).__init__()
self.theta = tf.Variable(tf.constant([0.0]*M, shape=[M,1], dtype=tf.float32))
self.lamb = lamb
# Probability (for a matrix of instances)
def predict(self, X):
return tf.math.sigmoid(tf.matmul(X, self.theta))
# Objective
def obj(self, X, y):
pred = self.predict(X)
pos = y*tf.math.log(pred)
neg = (1.0 - y)*tf.math.log(1.0 - pred)
return -tf.reduce_mean(pos + neg)
# Same objective, using tensorflow short-hand
def obj_short(self, X, y):
pred = self.predict(X)
bce = tf.keras.losses.BinaryCrossentropy()
return tf.reduce_mean(bce(y, pred))
# Regularizer
def reg(self):
return self.lamb * tf.reduce_sum(self.theta**2)
# Loss
def call(self, X, y):
return self.obj(X, y) + self.reg()
Initialize the model (lambda = 0)
optimizer = tf.keras.optimizers.Adam(0.1)
model = classificationModel(len(X[0]), 0)
Run for 1000 iterations
for iteration in range(1000):
with tf.GradientTape() as tape:
loss = model(X, y_class)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
model.theta
Model predictions (as probabilities via the sigmoid function)
model.predict(X)[:10]
def parseData(fname):
for l in open(fname):
yield eval(l)
Just read the first 5000 reviews (deliberately making a model that will overfit if not carefully regularized)
data = list(parseData(dataDir + "beer_50000.json"))[:5000]
Fit a simple bag-of-words model (see Chapter 8 for more details)
wordCount = defaultdict(int)
punctuation = set(string.punctuation)
for d in data: # Strictly, should just use the *training* data to extract word counts
r = ''.join([c for c in d['review/text'].lower() if not c in punctuation])
for w in r.split():
wordCount[w] += 1
counts = [(wordCount[w], w) for w in wordCount]
counts.sort()
counts.reverse()
1000 most popular words
words = [x[1] for x in counts[:1000]]
wordId = dict(zip(words, range(len(words))))
wordSet = set(words)
Bag-of-words features for 1000 most popular words
def feature(datum):
feat = [0]*len(words)
r = ''.join([c for c in datum['review/text'].lower() if not c in punctuation])
for w in r.split():
if w in words:
feat[wordId[w]] += 1
feat.append(1) # offset
return feat
random.shuffle(data)
X = [feature(d) for d in data]
y = [d['review/overall'] for d in data]
Ntrain,Nvalid,Ntest = 4000,500,500
Xtrain,Xvalid,Xtest = X[:Ntrain],X[Ntrain:Ntrain+Nvalid],X[Ntrain+Nvalid:]
ytrain,yvalid,ytest = y[:Ntrain],y[Ntrain:Ntrain+Nvalid],y[Ntrain+Nvalid:]
Unregularized model (train on training set, test on test set)
model = sklearn.linear_model.LinearRegression(fit_intercept=False)
model.fit(Xtrain, ytrain)
predictions = model.predict(Xtest)
sum((ytest - predictions)**2)/len(ytest) # Mean squared error
Regularized model ("ridge regression")
model = linear_model.Ridge(1.0, fit_intercept=False) # MSE + 1.0 l2
model.fit(Xtrain, ytrain)
predictions = model.predict(Xtest)
sum((ytest - predictions)**2)/len(ytest)
Track the model which works best on the validation set
bestModel = None
bestVal = None
bestLamb = None
Train models for different values of lambda (or C). Keep track of the best model on the validation set.
ls = [0.01, 0.1, 1, 10, 100, 1000, 10000]
errorTrain = []
errorValid = []
for l in ls:
model = sklearn.linear_model.Ridge(l)
model.fit(Xtrain, ytrain)
predictTrain = model.predict(Xtrain)
MSEtrain = sum((ytrain - predictTrain)**2)/len(ytrain)
errorTrain.append(MSEtrain)
predictValid = model.predict(Xvalid)
MSEvalid = sum((yvalid - predictValid)**2)/len(yvalid)
errorValid.append(MSEvalid)
print("l = " + str(l) + ", validation MSE = " + str(MSEvalid))
if bestVal == None or MSEvalid < bestVal:
bestVal = MSEvalid
bestModel = model
bestLamb = l
Using the best model from the validation set, compute the error on the test set
predictTest = bestModel.predict(Xtest)
MSEtest = sum((ytest - predictTest)**2)/len(ytest)
MSEtest
Plot the train/validation/test error associated with this pipeline
plt.xticks([])
plt.xlabel(r"$\lambda$")
plt.ylabel(r"error (MSE)")
plt.title(r"Validation Pipeline")
plt.xscale('log')
plt.plot(ls, errorTrain, color='k', linestyle='--', label='training error')
plt.plot(ls, errorValid, color='grey',zorder=4,label="validation error")
plt.plot([bestLamb], [MSEtest], linestyle='', marker='x', color='k', label="test error")
plt.legend(loc='lower left')
plt.show()
Same data as pipeline above, slightly bigger dataset
data = list(parseData(dataDir + "beer_50000.json"))[:10000]
Simple bag-of-words model (as in pipeline above, and in Chapter 8)
wordCount = defaultdict(int)
punctuation = set(string.punctuation)
for d in data:
r = ''.join([c for c in d['review/text'].lower() if not c in punctuation])
for w in r.split():
wordCount[w] += 1
counts = [(wordCount[w], w) for w in wordCount]
counts.sort()
counts.reverse()
words = [x[1] for x in counts[:1000]]
wordId = dict(zip(words, range(len(words))))
wordSet = set(words)
def feature(datum):
feat = [0]*len(words)
r = ''.join([c for c in datum['review/text'].lower() if not c in punctuation])
ws = r.split()
for w in ws:
if w in words:
feat[wordId[w]] += 1
feat.append(1) # offset
return feat
Predict whether the ABV is above 6.7 (roughly, above average) from the review text
random.shuffle(data)
X = [feature(d) for d in data]
y = [d['beer/ABV'] > 6.7 for d in data]
Train on first 9000 reviews, test on last 1000
mod = sklearn.linear_model.LogisticRegression(max_iter=5000)
mod.fit(X[:9000],y[:9000])
predictions = mod.predict(X[9000:]) # Binary vector of predictions
correct = predictions == y[9000:]
Accuracy
sum(correct) / len(correct)
To compute precision and recall, we want the output probabilities (or scores) rather than the predicted labels
probs = mod.predict_proba(X[9000:]) # could also use mod.decision_function
Build a simple data structure that contains the score, the predicted label, and the actual label (on the test set)
probY = list(zip([p[1] for p in probs], [p[1] > 0.5 for p in probs], y[9000:]))
For example...
probY[:10]
Sort this so that the most confident predictions come first
probY.sort(reverse=True)
For example...
probY[:10]
xROC = []
yROC = []
for thresh in numpy.arange(0,1.01,0.01):
preds = [x[0] > thresh for x in probY]
labs = [x[2] for x in probY]
if len(labs) == 0:
continue
TP = sum([(a and b) for (a,b) in zip(preds,labs)])
FP = sum([(a and not b) for (a,b) in zip(preds,labs)])
TN = sum([(not a and not b) for (a,b) in zip(preds,labs)])
FN = sum([(not a and b) for (a,b) in zip(preds,labs)])
TPR = TP / (TP + FN) # True positive rate
FPR = FP / (TN + FP) # False positive rate
xROC.append(FPR)
yROC.append(TPR)
plt.plot(xROC,yROC,color='k')
plt.plot([0,1],[0,1], lw=0.5, color='grey')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.show()
xPR = []
yPR = []
for i in range(1,len(probY)+1):
preds = [x[1] for x in probY[:i]]
labs = [x[2] for x in probY[:i]]
prec = sum(labs) / len(labs)
rec = sum(labs) / sum(y[9000:])
xPR.append(rec)
yPR.append(prec)
plt.plot(xPR,yPR,color='k')
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.ylim(0.4,1.01)
plt.plot([0,1],[sum(y[9000:]) / 1000, sum(y[9000:]) / 1000], lw=0.5, color = 'grey')
plt.title("Precision/Recall Curve")
plt.show()
path = dataDir + "beer_50000.json"
f = open(path)
data = []
for l in f:
data.append(eval(l))
f.close()
Count occurrences of each style
categoryCounts = defaultdict(int)
for d in data:
categoryCounts[d['beer/style']] += 1
categories = [c for c in categoryCounts if categoryCounts[c] > 1000]
catID = dict(zip(list(categories),range(len(categories))))
Build one-hot encoding using common styles
def feat(d):
feat = [0] * len(catID)
if d['beer/style'] in catID:
feat[catID[d['beer/style']]] = 1
return feat + [1]
X = [feat(d) for d in data]
y = [d['beer/ABV'] > 5 for d in data]
mod = sklearn.linear_model.LogisticRegression()
mod.fit(X,y)
Compute and report metrics
def metrics(y, ypred):
TP = sum([(a and b) for (a,b) in zip(y, ypred)])
TN = sum([(not a and not b) for (a,b) in zip(y, ypred)])
FP = sum([(not a and b) for (a,b) in zip(y, ypred)])
FN = sum([(a and not b) for (a,b) in zip(y, ypred)])
TPR = TP / (TP + FN)
TNR = TN / (TN + FP)
BER = 1 - 0.5*(TPR + TNR)
print("TPR = " + str(TPR))
print("TNR = " + str(TNR))
print("BER = " + str(BER))
ypred = mod.predict(X)
metrics(y, ypred)
Balance the classifier using the 'balanced' option
mod = sklearn.linear_model.LogisticRegression(class_weight='balanced')
mod.fit(X,y)
ypred = mod.predict(X)
metrics(y, ypred)
Precision/recall curves
probs = mod.predict_proba(X)
probY = list(zip([p[1] for p in probs], [p[1] > 0.5 for p in probs], y))
probY.sort(reverse=True) # Sort data by confidence
xPR = []
yPR = []
for i in range(1,len(probY)+1,100):
preds = [x[1] for x in probY[:i]]
labs = [x[2] for x in probY[:i]]
prec = sum(labs) / len(labs)
rec = sum(labs) / sum(y)
xPR.append(rec)
yPR.append(prec)
plt.plot(xPR,yPR,color='k')
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.ylim(0.4,1.01)
plt.plot([0,1],[sum(y) / len(y), sum(y) / len(y)], lw=0.5, color = 'grey')
plt.title("Precision/Recall Curve")
plt.show()
Model pipeline
dataTrain = data[:25000]
dataValid = data[25000:37500]
dataTest = data[37500:]
def pipeline(reg):
mod = linear_model.LogisticRegression(C=reg, class_weight='balanced')
X = [feat(d) for d in dataTrain]
y = [d['beer/ABV'] > 5 for d in dataTrain]
Xvalid = [feat(d) for d in dataValid]
yvalid = [d['beer/ABV'] > 5 for d in dataValid]
Xtest = [feat(d) for d in dataTest]
ytest = [d['beer/ABV'] > 5 for d in dataTest]
mod.fit(X,y)
ypredValid = mod.predict(Xvalid)
ypredTest = mod.predict(Xtest)
# validation
TP = sum([(a and b) for (a,b) in zip(yvalid, ypredValid)])
TN = sum([(not a and not b) for (a,b) in zip(yvalid, ypredValid)])
FP = sum([(not a and b) for (a,b) in zip(yvalid, ypredValid)])
FN = sum([(a and not b) for (a,b) in zip(yvalid, ypredValid)])
TPR = TP / (TP + FN)
TNR = TN / (TN + FP)
BER = 1 - 0.5*(TPR + TNR)
print("C = " + str(reg) + "; validation BER = " + str(BER))
# test
TP = sum([(a and b) for (a,b) in zip(ytest, ypredTest)])
TN = sum([(not a and not b) for (a,b) in zip(ytest, ypredTest)])
FP = sum([(not a and b) for (a,b) in zip(ytest, ypredTest)])
FN = sum([(a and not b) for (a,b) in zip(ytest, ypredTest)])
TPR = TP / (TP + FN)
TNR = TN / (TN + FP)
BER = 1 - 0.5*(TPR + TNR)
print("C = " + str(reg) + "; test BER = " + str(BER))
return mod
for c in [0.000001, 0.00001, 0.0001, 0.001]:
pipeline(c)
Fit the classification problem using a regular linear regressor
y_reg = [2.0*a - 1 for a in y] # Map data to {-1.0,1.0}
y_reg[:10]
model = sklearn.linear_model.LinearRegression(fit_intercept=False)
model.fit(X, y_reg)
yreg_pred = model.predict(X)
yreg_pred = [a > 0 for a in yreg_pred] # Map the outputs back to binary predictions
metrics(y, yreg_pred)