subhankarg's picture
Upload folder using huggingface_hub
0558aa4 verified
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
from typing import Dict, List, Tuple
import numpy as np
from numpy import ndarray
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, RobustScaler
from nemo.utils import logging
__all__ = ["IntCode", "FloatCode", "CategoryCode", "ColumnCodes"]
class Code(object):
def compute_code(self, data_series: ndarray):
"""
@params:
data_series: an array of input data used to calculate mapping
"""
raise NotImplementedError()
def __init__(self, col_name: str, code_len: int, start_id: int, fillall: bool = True, hasnan: bool = True):
"""
@params:
col_name: name of the column
code_len: number of tokens used to code the column.
start_id: offset for token_id.
fillall: if True, reserve space for digit number even the digit number is
not present in the data_series. Otherwise, only reserve space for the numbers
in the data_series.
hasnan: if True, reserve space for nan
"""
self.name = col_name
self.code_len = code_len
self.start_id = start_id
self.end_id = start_id
self.fillall = fillall
self.hasnan = hasnan
def encode(self, item: str) -> List[int]:
raise NotImplementedError()
def decode(self, ids: List[int]) -> str:
raise NotImplementedError()
@property
def code_range(self) -> List[Tuple[int, int]]:
"""
get the vocab id range for each of the encoded tokens
@returns [(min, max), (min, max), ...]
"""
return [(self.start_id, self.end_id)]
class IntCode(Code):
def __init__(
self, col_name: str, code_len: int, start_id: int, fillall: bool = True, base: int = 100, hasnan: bool = True
):
super().__init__(col_name, code_len, start_id, fillall, hasnan)
self.base = base
self.int_min: int = None
def compute_code(self, data_series: ndarray):
significant_val = self.array_convert_to_int(data_series)
digits_id_to_item = [{} for _ in range(self.code_len)]
digits_item_to_id = [{} for _ in range(self.code_len)]
for i in range(self.code_len):
id_to_item = digits_id_to_item[i]
item_to_id = digits_item_to_id[i]
v = (significant_val // self.base ** i) % self.base
if self.fillall:
uniq_items = range(0, self.base)
else:
uniq_items = sorted(np.unique(v).tolist())
for k in range(len(uniq_items)):
item = str(uniq_items[k])
item_to_id[item] = self.end_id
id_to_item[self.end_id] = item
self.end_id += 1
self.digits_id_to_item = digits_id_to_item
self.digits_item_to_id = digits_item_to_id
self.NA_token = 'nan'
if self.hasnan:
self.end_id += 1 # add the N/A token
codes = []
ranges = self.code_range
for i in ranges:
codes.append(i[1] - 1)
self.NA_token_id = codes
def array_convert_to_int(self, val: ndarray):
val = val.astype(int)
self.int_min = val.min()
return val - self.int_min
def convert_to_int(self, val: float) -> int:
return int(val) - self.int_min
def reverse_convert_to_int(self, val: int) -> int:
return val + self.int_min
@property
def code_range(self) -> List[Tuple[int, int]]:
"""
get the vocab id range for each of the encoded tokens
@returns [(min, max), (min, max), ...]
"""
# first largest digits
outputs = []
c = 0
for i in reversed(range(self.code_len)):
ids = self.digits_id_to_item[i].keys()
if c == 0:
if self.hasnan:
outputs.append((min(ids), max(ids) + 2)) # the first token contains the N/A
else:
outputs.append((min(ids), max(ids) + 1)) # non N/A
else:
outputs.append((min(ids), max(ids) + 1))
c += 1
return outputs
def encode(self, item: str) -> List[int]:
if self.hasnan and item == self.NA_token:
return self.NA_token_id
elif not self.hasnan and item == self.NA_token:
raise ValueError(f"colum {self.name} cannot handle nan, please set hasnan=True")
val = float(item)
val_int = self.convert_to_int(val)
digits = []
for i in range(self.code_len):
digit = (val_int // self.base ** i) % self.base
digits.append(str(digit))
if (val_int // self.base ** self.code_len) != 0:
raise ValueError("not right length")
codes = []
for i in reversed(range(self.code_len)):
digit_str = digits[i]
if digit_str in self.digits_item_to_id[i]:
codes.append(self.digits_item_to_id[i][digit_str])
else:
# find the nearest encode id
allowed_digits = np.array([int(d) for d in self.digits_item_to_id[i].keys()])
near_id = np.argmin(np.abs(allowed_digits - int(digit_str)))
digit_str = str(allowed_digits[near_id])
codes.append(self.digits_item_to_id[i][digit_str])
logging.warning('out of domain num is encounterd, use nearest code')
return codes
def decode(self, ids: List[int]) -> str:
if self.hasnan and ids[0] == self.NA_token_id[0]:
return self.NA_token
v = 0
for i in reversed(range(self.code_len)):
digit = int(self.digits_id_to_item[i][ids[self.code_len - i - 1]])
v += digit * self.base ** i
v = self.reverse_convert_to_int(v)
return str(v)
class FloatCode(IntCode):
def __init__(
self,
col_name: str,
code_len: int,
start_id: int,
fillall: bool = True,
base: int = 100,
hasnan: bool = True,
transform: str = 'quantile',
):
super().__init__(col_name, code_len, start_id, fillall, base, hasnan)
if transform == 'yeo-johnson':
self.scaler = PowerTransformer(standardize=True)
elif transform == 'quantile':
self.scaler = QuantileTransformer(output_distribution='uniform', n_quantiles=100)
elif transform == 'robust':
self.scaler = RobustScaler()
else:
raise ValueError('Supported data transformations are "yeo-johnson", "quantile", and "robust"')
def convert_to_int(self, val: float) -> int:
val = np.expand_dims(np.array(val), axis=0)
values = self.scaler.transform(val[:, None])[:, 0] - self.mval
values = (values * self.base ** self.extra_digits).astype(int)
output = values[0]
return output
def array_convert_to_int(self, val: ndarray):
values = self.scaler.fit_transform(val[:, None])[:, 0]
self.mval = values.min()
values = values - self.mval
digits = int(math.log(values.max(), self.base)) + 1
# extra digits used for 'float' part of the number
extra_digits = self.code_len - digits
if extra_digits < 0:
raise ValueError("need large length to code the nummber")
self.extra_digits = extra_digits
values = (values * self.base ** self.extra_digits).astype(int)
return values
def reverse_convert_to_int(self, val: int) -> float:
val = val / self.base ** self.extra_digits
val = np.expand_dims(np.array(val), axis=0)
v = self.scaler.inverse_transform(val[:, None] + self.mval)[0, 0]
return v
def decode(self, ids: List[int]) -> str:
if self.hasnan and ids[0] == self.NA_token_id[0]:
return self.NA_token
v = 0
for i in reversed(range(self.code_len)):
digit = int(self.digits_id_to_item[i][ids[self.code_len - i - 1]])
v += digit * self.base ** i
v = self.reverse_convert_to_int(v)
accuracy = max(int(abs(np.log10(0.1 / self.base ** self.extra_digits))), 1)
return f"{v:.{accuracy}f}"
class CategoryCode(Code):
def __init__(self, col_name: str, start_id: int):
super().__init__(col_name, 1, start_id, True, False)
def compute_code(self, data_series: ndarray):
uniq_items = np.unique(data_series).tolist()
id_to_item = {}
item_to_id = {}
for i in range(len(uniq_items)):
item = str(uniq_items[i])
item_to_id[item] = self.end_id
id_to_item[self.end_id] = item
self.end_id += 1
self.id_to_item = id_to_item
self.item_to_id = item_to_id
def encode(self, item) -> List[int]:
return [self.item_to_id[item]]
def decode(self, ids: List[int]) -> str:
return self.id_to_item[ids[0]]
column_map = {"int": IntCode, "float": FloatCode, "category": CategoryCode}
class ColumnCodes(object):
def __init__(self):
self.column_codes: Dict[str, Code] = {}
self.columns = []
self.sizes = []
@property
def vocab_size(self):
return self.column_codes[self.columns[-1]].end_id
def register(self, name: str, ccode: Code):
self.columns.append(name)
self.column_codes[name] = ccode
self.sizes.append(ccode.code_len)
def encode(self, col: str, item: str) -> List[int]:
if col in self.column_codes:
return self.column_codes[col].encode(item)
else:
raise ValueError(f"cannot encode {col} {item}")
def decode(self, col: str, ids: List[int]) -> str:
if col in self.column_codes:
return self.column_codes[col].decode(ids)
else:
raise ValueError("cannot decode")
def get_range(self, column_id: int) -> List[Tuple[int, int]]:
return self.column_codes[self.columns[column_id]].code_range
@classmethod
def get_column_codes(cls, column_configs, example_arrays):
column_codes = cls()
beg = 0
cc = None
for config in column_configs:
col_name = config['name']
coder = column_map[config['code_type']]
args = config.get('args', {})
start_id = beg if cc is None else cc.end_id
args['start_id'] = start_id
args['col_name'] = col_name
cc = coder(**args)
cc.compute_code(example_arrays[col_name])
column_codes.register(col_name, cc)
return column_codes