A versatile data API
Thus far we've been working with the MNIST dataset, which is small and therefore convenient but it is not representative of a real-world data pipeline. Nowadays its unheard of to be able to load the entire dataset into RAM and simply traing away.
In order to work with a variety of data types and sizes we want to make a modular data API. Each module or 'block' will handle a task and easily customizable.
Here are the blocks and their tasks:
%reload_ext autoreload
%autoreload 2
%matplotlib inline
#export
from exp.nb_08a import *
The ItemList
will be our base data container.
Essentially just holds a list of files and takes a list of transforms (i.e. functions) that it applies when the list is accessed via __getitem__
; using the notation self[key]
.
It is agnostic about what type of files - it could be text, images, or media of some sort.
The compose
function is passed the x
or item and applies the transform functions one at a time. To ensure the transforms are done in specific order it uses a sorting key based on the _order
. Then iterates through funcs
in order and calls each one passing x
and kwargs
#export
import PIL, os, mimetypes
from pathlib import Path
Path.ls = lambda x: list(x.iterdir())
Function composition definition from Wikipedia:
In computer science, function composition is an act or mechanism to combine simple functions to build more complicated ones. Like the usual composition of functions in mathematics, the result of each function is passed as the argument of the next, and the result of the last one is the result of the whole.
Basically, we need a compose
function that will take an input and apply a series of functions to it one after another in order and return the transformed input.
#export
def compose(x, funcs, *args, order_key="_order", **kwargs):
# ordering key
key = lambda o: getattr(o, order_key, 0)
# iterate through ordered funcs
for f in sorted(listify(funcs), key=key):
# call each func passing the returned x each time
x = f(x, **kwargs)
return x
#export
class ItemList(ListContainer):
def __init__(self, items, path='.', tfms=None):
super().__init__(items) #passes all items to the ListContainer
self.path,self.tfms = Path(path),tfms
def __repr__(self): # shows the first 10 items and then the path
return f'{super().__repr__()}\nPath: {self.path}'
def new(self, items, cls=None):
# creates a new item with of the same class, path and tfms as its passed
# will be used when we need to split the data later
if cls is None:
cls=self.__class__
return cls(items, self.path, tfms=self.tfms)
def get(self, i):
return i # subclasses will over ride
def _get(self, i): # processing by transforms is done on the fly - when __getitem__ is called
return compose(self.get(i), self.tfms)
def __getitem__(self, idx):
res = super().__getitem__(idx)
if isinstance(res,list): return [self._get(o) for o in res]
return self._get(res)
A = ItemList(['1','2','3']); A
A[0]
We'll start off with another computer vision dataset Imagenette:
Imagenette is a subset of 10 easily classified classes from Imagenet (tench, English springer, cassette player, chain saw, church, French horn, garbage truck, gas pump, golf ball, parachute).
We'll download the dataset using the fastai datasets module and then write some functions to collect the filenames into a list.
path = datasets.untar_data(datasets.URLs.IMAGENETTE_160); path
path.ls()
(path/'val').ls()
path_tench = path/'val'/'n01440764'
img_path = path_tench.ls()[0]
img = PIL.Image.open(img_path); img
plt.imshow(img)
import numpy
imga = numpy.array(img); imga.shape
By opening our image in Numpy we can see its simply a 3 dimensional array of ints between 0 and 255.
imga[:10, :10, 0]
#export
image_extension = set(k for k,v in mimetypes.types_map.items() if v.startswith('image'))
Setify is handy for eliminating duplicate elements:
#export
def setify(x): return x if isinstance(x, set) else set(listify(x))
setify(['a','b','a'])
test_eq({'a','a','b'}, setify(['a','a','b']))
test_eq({'a',1}, setify(['a',1]))
_get_files
will do the heavy lifting of collecting the specific files with the image extensions we want from a specific path.
It filters the filenames to check if they are in extensions and excludes system files starting with '.':
#export
def _get_files(p, fs, extensions=None):
p = Path(p)
# path / filename if it does not start with a '.' and (no extensions are given OR the suffix is in extensions)
res = [p/f for f in fs if not f.startswith('.') and ((not extensions) or f'.{f.split(".")[-1].lower()}' in extensions)]
return res
We can quickly scan a diretory using the built in os.scandir
which returns a list of filenames in that directory:
fs = [o.name for o in os.scandir(path_tench)]
t = _get_files(path_tench, fs, image_extension)
fs[:3]
And our _get_files
function will return the full path name of whatever files in that folder meet the defined conditions:
t[:3]
Now let's write a recursive function that will go through a given folder as well as its subfolders and return a list of image files:
#export
def get_files(path, extensions=None, recurse=False, include=None):
path = Path(path)
extensions = setify(extensions)
extensions = {e.lower() for e in extensions}
if recurse:
res = []
#dirpath, dirnames, filenames
for i,(p,d,f) in enumerate(os.walk(path)):
if include is not None and i==0:
d[:] = [o for o in d if o in include]
else:
d[:] = [o for o in d if not o.startswith('.')]
res += _get_files(p, f, extensions)
return res
else:
f = [o.name for o in os.scandir(path) if o.is_file()]
return _get_files(path, f, extensions=extensions)
get_files(path_tench, extensions=image_extension)[:3]
But if we just give it our original path
we'll need it to recursively go through each subfolder:
get_files(path, extensions=image_extension, recurse=True)[:3]
all_imgs = get_files(path, extensions=image_extension, recurse=True)
len(all_imgs)
13,000 filenames in 58.4 milliseconds
%timeit get_files(path, image_extension, recurse=True)
Now we can make an ImageList
by inheriting from ItemList
and defining how we want to get the files.
We just need two methods:
from_files
: is a classmethod and will be used to instantiate an ImageList (ItemList) by calling get_files
get
: opens the filename passed it to it.
@classmethod
: Python only allows one init method per class. Using class methods it’s possible to add as many alternative constructors as necessary. Here we are using from_files
as the method for supplying the items for the ItemList
. It allows us to define unique constructors.
#export
class ImageList(ItemList):
@classmethod
def from_files(cls, path, extensions=None, recurse=True, include=None, **kwargs):
if extensions is None: extensions = image_extension
return cls(get_files(path, extensions, recurse=recurse, include=None), path, **kwargs)
def get(self, fn): return PIL.Image.open(fn)
Now we can use the ImageList
class and instantiate it with from_files
which collects the image filenames.
il = ImageList.from_files(path)
And using the __getitem__
we can open and image:
il[0]
Transforms aren't only used for data augmentation. To allow total flexibility, ImageList
returns the raw PIL image. The first thing is to convert it to 'RGB' (or something else).
Transforms only need to be functions that take an element of the ItemList
and transform it.
If they need state, they can be defined as a class. Also, having them as a class allows to define an _order
attribute (default 0) that is used to sort the transforms.
#export
class Transform():
_order = 0
Now let's make a Transformer class which converts an image to RG just in case we have a grayscale image in the batch. Grayscale will only have 1 channel (instead of 3) and will present a problem for our model.
PIL's convert('RGB')
will open as RGB and copy the single channel to the other two if need be
#export
class MakeRGB(Transform):
def __call__(self, x): return x.convert('RGB')
But we can also just do this with a function and add a _order
attribute:
#export
def make_rgb(x):
return x.convert('RGB')
make_rgb._order = 0
Let's make our ImageList
again and pass our convert transform:
il = ImageList.from_files(path, tfms=make_rgb)
And our __repr__
gives us the number of items and the first ten:
il
il[:1]
There are numerous ways the training and validation might be separated.
In most cases, when not dealing with public datasets, we'll need to write a function that can randomly separate a specific percentage of the data.
We'll start with a function that will solve our problem of having the training and validation split into separate folders and then build a general class that can take other functions to make the split.
path.ls()
fn = il.items[0]; fn
The items in our ImageList
are Paths so we can see the dirname easily.
Then we can call parent
twice to get to the train folder.
fn.parent.parent.as_posix().split('/')[-1]
Now we'll create a function to do this.
#export
def grandparent_splitter(fn, train_name="train", valid_name="valid"):
folder = fn.parent.parent.as_posix().split('/')[-1]
return True if folder == train_name else False if folder == valid_name else None
And a function that creates a mask based on the function you pass to it - in our case grandparent_splitter
- and returns the True
and False
elements in two distinct lists:
#export
def split_by_func(items, f):
mask = [f(o) for o in items]
fal = [o for o, m in zip(items, mask) if m==False]
tru = [o for o, m in zip(items, mask) if m==True]
return tru, fal
Make a splitter
function for our imagenette dataset:
splitter = partial(grandparent_splitter, valid_name='val')
train, valid = split_by_func(il.items, splitter)
len(train),len(valid)
SplitData
class¶Now that have a function for our dataet that can determine which files belong to what set for a given path let's make a SplitData
class that can hold the training and validation set and be instantiated by a classmethod. In this case our class method instantiates it using split_by_func
:
#export
class SplitData():
def __init__(self, train, valid): self.train, self.valid = train, valid
def __getattr__(self, k): return getattr(self.train, k)
def __setstate__(self, data:Any): self.__dict__.update(data)
@classmethod
def split_by_func(cls, il, f):
lists = map(il.new, split_by_func(il.items, f))
return cls(*lists)
def __repr__(self):
return f'{self.__class__.__name__}\nTrain: {self.train}\nValid: {self.valid}\n'
sd = SplitData.split_by_func(il, splitter); sd
sd.train[0]
sd.valid[0]
A Processor is a transformation that is applied to all the inputs once at initialization, with some state computed on the training set that is then applied without modification on the validation set (and maybe the test set or at inference time on a single item).
Types of Processors:
NLP: for processing texts to tokenize, then numericalize them. We want the validation set to be numericalized with exactly the same vocabulary as the training set.
Tabular Data: where we want to fill missing values with (for instance) the median computed on the training set. That statistic is stored in the inner state of the Processor and applied on the validation set.
convert label strings to numbers in a consistent and reproducible way. So we create a list of possible labels in the training set, and then convert our labels to numbers based on this vocab.
This is the base Processor
class that does not really do anything. It has an instance method called process
that just returns whatever it is given.
#export
class Processor():
def process(self, items):
return items
CategoryProcessor
¶Now in order to get labels for our image classification task we'll need a Processor is able to find and hold the categories it finds in the training set.
We'll write a function called uniqueify
that will return an ordered list of unique items from an iterable. An OrderedDict
is simply a dictionary that remembers insertion order.
#export
from collections import OrderedDict
def uniqueify(x, sort=False):
res = list(OrderedDict.fromkeys(x).keys())
if sort: res.sort()
return res
list(OrderedDict.fromkeys([2,1,1,3]).keys())
its = ['Apple','Apple','Carrot','Orange','Orange']
objs = list(OrderedDict.fromkeys(its).keys()); objs
{v:k for k,v in enumerate(objs)}
Now the main CategoryProcesser
class.
This processor will be able to handle two types of reference:
#export
class CategoryProcesser(Processor):
def __init__(self):
self.vocab = None #this is the state, a list of unique objs
def __call__(self, items):
if self.vocab is None:
self.vocab = uniqueify(items)
self.otoi = {v:k for k,v in enumerate(self.vocab)} # obj:int from 0 to n
return [self.proc1(o) for o in items]
def proc1(self, item):
return self.otoi[item]
def deprocess(self, idxs):
assert self.vocab is not None
return [self.deproc1(idx) for idx in idxs]
def deproc1(self, idx):
return self.vocab[idx]
Let's demonstrate what the CategoryProcessor
is meant to do.
It's not initialized with anything and the vocab
attribute is set to None
:
cp = CategoryProcesser()
assert cp.vocab == None
its
Then we __call__
it passing a list of strings.
For the training set the vocab
is None
so it will create a vocab
by sending that list of items to uniqueify
and then create a dictionary that maps each item to index number which is the category.
cp(its)
Now if its the validation set, vocab
will not be none and all of the labels passed will be sent to proc1
and a category number will be returned:
cp.proc1('Apple')
And we can deprocess by passing a category number and getting label in return:
cp.deproc1(0)
Which is also the index of the vocab:
cp.vocab[0]
For our labelling block we need to map from whatever type of label our training set has (whether a string, int, or float) to a $y$ tensor that our model can understand and calculate the loss on.
When doing classification our categories are mutually exclusive, so the set of possible labels is determined on the training set and then applied to the validation set. This ensures our $x, y$ mapping is the same for both.
To accomplish this we'll need a Processor.
LabeledData
Class¶For splitting the data into train and validation sets we needed the grandparent for this dataset.
For labelling we need the parent folder:
fn.parent.name
#export
def parent_labeler(fn):
return fn.parent.name
def _label_by_func(ds, f, cls=ItemList):
return cls([f(o) for o in ds.items], path=ds.path)
This API is slightly confused and needs to be rethought.
#export
class LabeledData():
def process(self, il, proc):
return il.new(compose(il.items, proc))
def __init__(self, x, y, proc_x =None, proc_y=None):
self.x = self.process(x, proc_x)
self.y = self.process(y, proc_y)
self.proc_x = proc_x
self.proc_y = proc_y
def __repr__(self):
return f'{self.__class__.__name__}\nx: {self.x}\ny: {self.y}\n'
def __getitem__(self, idx):
return self.x[idx], self.y[idx]
def __len__(self):
return len(self.x)
def x_obj(self, idx):
return self.obj(self.x, idx, self.proc_x)
def y_obj(self, idx):
return self.obj(self.y, idx, self.proc_y)
# method to
def obj(self, items, idx, procs):
isint = isinstance(idx, int) or (isinstance(idx, torch.LongTensor) and not idx.ndim)
item = items[idx]
for proc in reversed(listify(procs)):
item = proc.deproc1(item) if isint else proc.deprocess(item)
return item
@classmethod
def label_by_func(cls, il, f, proc_x=None, proc_y=None):
return cls(il, _label_by_func(il,f), proc_x=proc_x, proc_y=proc_y)
#export
def label_by_func(sd, f, proc_x=None, proc_y=None):
train = LabeledData.label_by_func(sd.train, f, proc_x=proc_x, proc_y=proc_y)
valid = LabeledData.label_by_func(sd.valid, f, proc_x=proc_x, proc_y=proc_y)
return SplitData(train, valid)
ll = label_by_func(sd, parent_labeler, proc_y=CategoryProcesser())
ll
assert ll.train.proc_y == ll.valid.proc_y
ll.__class__
ll.train.__class__
ll.train.x.__class__
ll.train.x.items.__class__
ll.train.y.items[0], ll.train.y_obj(0)
ll.train.x[0]
What are transforms?
What is the general pattern?
What transforms do we need for images specifically?
Our first transform is easy - just resize each image to a given size.
ll.train[0][0]
ll.train[0][0].resize((128,128))
#export
class ResizeFixed(Transform):
_order = 10
def __init__(self, size):
if isinstance(size, int):
size=(size, size)
self.size = size
def __call__(self, item):
return item.resize(self.size, PIL.Image.BILINEAR)
to_byte_tensor
transform¶Happens after resizing.
From TorchVision:
#export
def to_byte_tensor(item):
res = torch.ByteTensor(torch.ByteStorage.from_buffer(item.tobytes()))
w,h = item.size
return res.view(h,w, -1).permute(2,0,1) # put channel first for Pytorch
to_byte_tensor._order = 20
to_byte_tensor(ll.train[0][0]).shape
to_float_tensor
transform¶Finally, we need the image to be tensors of floats between 0 and 1. And this needs to after to_byte
so _order
is set to 30
#export
def to_float_tensor(item):
return item.float().div_(255.)
to_float_tensor._order=30
to_float_tensor(to_byte_tensor(ll.train[0][0]))[:2]
It's useful to have a show image function
#export
def show_image(im, figsize=(3,3)):
plt.figure(figsize=figsize)
plt.axis('off')
plt.imshow(im.permute(1,2,0)) # put channel last to print out
Now to put all of following back together:
tfms = [make_rgb, ResizeFixed(128), to_byte_tensor, to_float_tensor]
il = ImageList.from_files(path, tfms=tfms)
sd = SplitData.split_by_func(il, splitter)
ll = label_by_func(sd, parent_labeler, proc_y=CategoryProcesser())
x, y = ll.train[0]
show_image(x)
Now we have our datasets we can put them into a modified DataBunch
object:
bs = 64
train_dl, valid_dl = get_dls(ll.train, ll.valid, bs)
x, y = next(iter(train_dl))
x.shape
We can now see the first image in the batch.
This image has had transforms applied to it:
show_image(x[0])
See the vocab for it - the label:
ll.train.proc_y.vocab[y[0]]
y
This is basically what we had before except now we are adding to convenience attributes c_in
and c_out
which means we can have our models use the number of in and out channels from the databunch:
#export
class DataBunch():
def __init__(self, train_dl, valid_dl, c_in=None, c_out=None):
self.train_dl, self.valid_dl = train_dl, valid_dl
self.c_in, self.c_out = c_in, c_out
@property
def train_ds(self): return self.train_dl.dataset
@property
def valid_ds(self): return self.valid_dl.dataset
And add a helper function to Splitdata
that lets us return a DataBunch
directly:
(monkey patch)
#export
def databunchify(sd, bs, c_in=None, c_out=None, **kwargs):
train, valid = get_dls(sd.train, sd.valid, bs=bs, **kwargs)
return DataBunch(train, valid, c_in=c_in, c_out=c_out)
SplitData.to_databunch = databunchify
Full summary of collecting data to DataBunch:
path = datasets.untar_data(datasets.URLs.IMAGENETTE_160) # downloads and returns a path to folder
tfms = [make_rgb, ResizeFixed(128), to_byte_tensor, to_float_tensor] # transforms to be applied to images
il = ImageList.from_files(path, tfms=tfms) # Imagelist from files
sd = SplitData.split_by_func(il, partial(grandparent_splitter, valid_name="val")) # Splitdata by function
ll = label_by_func(sd, parent_labeler, proc_y=CategoryProcesser()) # label the data by parent folder
data = ll.to_databunch(bs, c_in=3, c_out=10)
show_image(data.train_ds[0][0])
callbacks = [partial(AvgStatsCallback, accuracy), CudaCallback]
Finding the mean and standard deviation of a batch to normalize our data.
m = x.mean((0,2,3)).cuda() # mean of each channel
s = x.std((0,2,3)).cuda() # std of each channel
m, s
#export
def normalize_chan(x, mean, std):
return (x-mean[...,None,None]) / std[...,None,None]
_m = tensor([0.4419, 0.4332, 0.4166])
_s = tensor([0.2732, 0.2724, 0.2971])
norm_imagenette = partial(normalize_chan, mean=_m.cuda(), std=_s.cuda())
callbacks.append(partial(BatchTransformXCallback, norm_imagenette))
nb_auto_export()