numpy-ml\numpy_ml\preprocessing\nlp.py
# 导入必要的库和模块
import re
import heapq
import os.path as op
from collections import Counter, OrderedDict, defaultdict
import numpy as np# 定义英文停用词列表,来源于"Glasgow Information Retrieval Group"
_STOP_WORDS = set().split(" "),
)# 定义用于匹配单词的正则表达式,用于分词
_WORD_REGEX = re.compile(r"(?u)\b\w\w+\b") # sklearn默认
_WORD_REGEX_W_PUNC = re.compile(r"(?u)\w+|[^a-zA-Z0-9\s]")
_WORD_REGEX_W_PUNC_AND_WHITESPACE = re.compile(r"(?u)s?\w+\s?|\s?[^a-zA-Z0-9\s]\s?")# 定义用于匹配标点符号的正则表达式
_PUNC_BYTE_REGEX = re.compile(r"(33|34|35|36|37|38|39|40|41|42|43|44|45|"r"46|47|58|59|60|61|62|63|64|91|92|93|94|"r"95|96|123|124|125|126)",
)
# 定义标点符号
_PUNCTUATION = "!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~"
# 创建用于去除标点符号的转换表
_PUNC_TABLE = str.maketrans("", "", _PUNCTUATION)# 定义函数,返回指定长度的n-gram序列
def ngrams(sequence, N):"""Return all `N`-grams of the elements in `sequence`"""assert N >= 1return list(zip(*[sequence[i:] for i in range(N)]))# 定义函数,将字符串按空格分词,可选择是否转为小写、过滤停用词和标点符号
def tokenize_whitespace(line, lowercase=True, filter_stopwords=True, filter_punctuation=True, **kwargs,
):"""Split a string at any whitespace characters, optionally removingpunctuation and stop-words in the process."""line = line.lower() if lowercase else linewords = line.split()line = [strip_punctuation(w) for w in words] if filter_punctuation else linereturn remove_stop_words(words) if filter_stopwords else words# 定义函数,将字符串按单词分词,可选择是否转为小写、过滤停用词和标点符号
def tokenize_words(line, lowercase=True, filter_stopwords=True, filter_punctuation=True, **kwargs,
):"""Split a string into individual words, optionally removing punctuation andstop-words in the process."""REGEX = _WORD_REGEX if filter_punctuation else _WORD_REGEX_W_PUNCwords = REGEX.findall(line.lower() if lowercase else line)return remove_stop_words(words) if filter_stopwords else words# 定义函数,将字符串按字节分词
def tokenize_words_bytes(line,# 设置是否将文本转换为小写lowercase=True,# 设置是否过滤停用词filter_stopwords=True,# 设置是否过滤标点符号filter_punctuation=True,# 设置文本编码格式为 UTF-8encoding="utf-8",# **kwargs 表示接受任意数量的关键字参数,这些参数会被传递给函数的其他部分进行处理**kwargs,
# 将字符串拆分为单词,并在此过程中选择性地删除标点符号和停用词。将每个单词转换为字节列表。
def tokenize_words(line,lowercase=lowercase,filter_stopwords=filter_stopwords,filter_punctuation=filter_punctuation,**kwargs,
):# 对单词进行分词处理,根据参数选择是否转换为小写、过滤停用词和标点符号words = tokenize_words(line,lowercase=lowercase,filter_stopwords=filter_stopwords,filter_punctuation=filter_punctuation,**kwargs,)# 将单词转换为字节列表,每个字节用空格分隔words = [" ".join([str(i) for i in w.encode(encoding)]) for w in words]# 返回字节列表return words# 将字符串中的字符转换为字节集合。每个字节用0到255之间的整数表示。
def tokenize_bytes_raw(line, encoding="utf-8", splitter=None, **kwargs):# 将字符串中的字符编码为字节,每个字节用空格分隔byte_str = [" ".join([str(i) for i in line.encode(encoding)])# 如果指定了分隔符为标点符号,则在编码为字节之前在标点符号处进行分割if splitter == "punctuation":byte_str = _PUNC_BYTE_REGEX.sub(r"-\1-", byte_str[0]).split("-")return byte_str# 将字节(表示为0到255之间的整数)解码为指定编码的字符。
def bytes_to_chars(byte_list, encoding="utf-8"):# 将字节列表中的整数转换为十六进制字符串hex_array = [hex(a).replace("0x", "") for a in byte_list]# 将十六进制字符串连接起来,并在需要时在前面补0hex_array = " ".join([h if len(h) > 1 else f"0{h}" for h in hex_array])# 将十六进制字符串转换为字节数组,再根据指定编码解码为字符return bytearray.fromhex(hex_array).decode(encoding)# 将字符串中的字符转换为小写,并根据参数选择是否过滤标点符号。
def tokenize_chars(line, lowercase=True, filter_punctuation=True, **kwargs):# 将字符串拆分为单个字符,可选择在此过程中删除标点符号和停用词"""# 如果需要转换为小写,则将字符串转换为小写line = line.lower() if lowercase else line# 如果需要过滤标点符号,则调用函数去除标点符号line = strip_punctuation(line) if filter_punctuation else line# 使用正则表达式将连续多个空格替换为一个空格,并去除首尾空格,然后将结果转换为字符列表chars = list(re.sub(" {2,}", " ", line).strip())# 返回字符列表return chars
# 从单词字符串列表中移除停用词
def remove_stop_words(words):"""Remove stop words from a list of word strings"""# 返回不在停用词列表中的单词return [w for w in words if w.lower() not in _STOP_WORDS]# 从字符串中移除标点符号
def strip_punctuation(line):"""Remove punctuation from a string"""# 使用_PUNC_TABLE来移除字符串中的标点符号,并去除首尾空格return line.translate(_PUNC_TABLE).strip()#######################################################################
# Byte-Pair Encoder #
######################################################################## 定义一个Byte-Pair编码器类
class BytePairEncoder(object):def __init__(self, max_merges=3000, encoding="utf-8"):"""A byte-pair encoder for sub-word embeddings.Notes-----Byte-pair encoding [1][2] is a compression algorithm that iterativelyreplaces the most frequently ocurring byte pairs in a set of documentswith a new, single token. It has gained popularity as a preprocessingstep for many NLP tasks due to its simplicity and expressiveness: usinga base coebook of just 256 unique tokens (bytes), any string can beencoded.References----------.. [1] Gage, P. (1994). A new algorithm for data compression. *CUsers Journal, 12(2)*, 23–38... [2] Sennrich, R., Haddow, B., & Birch, A. (2015). Neural machinetranslation of rare words with subword units, *Proceedings of the54th Annual Meeting of the Association for ComputationalLinguistics,* 1715-1725.Parameters----------max_merges : intThe maximum number of byte pair merges to perform during the:meth:`fit` operation. Default is 3000.encoding : strThe encoding scheme for the documents used to train the encoder.Default is `'utf-8'`."""# 初始化参数字典self.parameters = {"max_merges": max_merges,"encoding": encoding,}# 初始化字节到标记和标记到字节的有序字典。字节以十进制表示为0到255之间的整数。# 在255之前,标记和字节表示之间存在一对一的对应关系。self.byte2token = OrderedDict({i: i for i in range(256)})self.token2byte = OrderedDict({v: k for k, v in self.byte2token.items()})# 在给定语料库上训练一个字节对编码表def fit(self, corpus_fps, encoding="utf-8"):"""Train a byte pair codebook on a set of documents.Parameters----------corpus_fps : str or list of strsThe filepath / list of filepaths for the document(s) to be used tolearn the byte pair codebook.encoding : strThe text encoding for documents. Common entries are either 'utf-8'(no header byte), or 'utf-8-sig' (header byte). Default is'utf-8'."""# 创建一个词汇表对象,用于存储字节对编码表vocab = (Vocabulary(lowercase=False,min_count=None,max_tokens=None,filter_stopwords=False,filter_punctuation=False,tokenizer="bytes",)# 在给定语料库上拟合词汇表.fit(corpus_fps, encoding=encoding)# 获取词汇表中的计数信息.counts)# 迭代地合并跨文档中最常见的字节二元组for _ in range(self.parameters["max_merges"]):# 获取词汇表中的字节二元组计数信息pair_counts = self._get_counts(vocab)# 找到出现次数最多的字节二元组most_common_bigram = max(pair_counts, key=pair_counts.get)# 合并最常见的字节二元组到词汇表中vocab = self._merge(most_common_bigram, vocab)# 初始化一个空集合,用于存储字节标记token_bytes = set()# 遍历词汇表中的键for k in vocab.keys():# 将键按空格分割,筛选包含"-"的字节标记token_bytes = token_bytes.union([w for w in k.split(" ") if "-" in w])# 遍历字节标记集合for i, t in enumerate(token_bytes):# 将字节标记转换为元组形式byte_tuple = tuple(int(j) for j in t.split("-"))# 将字节标记映射到对应的标记索引self.token2byte[256 + i] = byte_tuple# 将字节标记索引映射到对应的字节标记self.byte2token[byte_tuple] = 256 + i# 返回当前对象return self# 获取词汇表中的字节二元组计数信息def _get_counts(self, vocab):"""Collect bigram counts for the tokens in vocab"""# 初始化一个默认字典,用于存储字节二元组计数pair_counts = defaultdict(int)# 遍历词汇表中的单词和计数信息for word, count in vocab.items():# 生成单词的二元组pairs = ngrams(word.split(" "), 2)# 遍历单词的二元组for p in pairs:# 更新字节二元组计数信息pair_counts[p] += count# 返回字节二元组计数信息return pair_counts# 将给定的二元组替换为单个标记,并相应更新词汇表def _merge(self, bigram, vocab):v_out = {}# 转义二元组中的单词,用于正则表达式匹配bg = re.escape(" ".join(bigram))# 创建匹配二元组的正则表达式bigram_regex = re.compile(r"(?<!\S)" + bg + r"(?!\S)")# 遍历词汇表中的单词for word in vocab.keys():# 将匹配到的二元组替换为连接符"-"w_out = bigram_regex.sub("-".join(bigram), word)v_out[w_out] = vocab[word]return v_out# 将文本中的单词转换为其字节对编码的标记IDdef transform(self, text):"""Transform the words in `text` into their byte pair encoded token IDs.Parameters----------text: str or list of `N` stringsThe list of strings to encodeReturns-------codes : list of `N` listsA list of byte pair token IDs for each of the `N` strings in`text`.Examples-------->>> B = BytePairEncoder(max_merges=100).fit("./example.txt")>>> encoded_tokens = B.transform("Hello! How are you 😁 ?")>>> encoded_tokens[[72, 879, 474, ...]]"""# 如果输入是字符串,则转换为列表if isinstance(text, str):text = [text]# 对文本中的每个字符串进行转换return [self._transform(string) for string in text]# 将单个文本字符串转换为字节对 ID 列表def _transform(self, text):# 获取参数配置P = self.parameters# 将文本字符串转换为原始字节流_bytes = tokenize_bytes_raw(text, encoding=P["encoding"])# 初始化编码结果列表encoded = []# 遍历每个字节对for w in _bytes:l, r = 0, len(w)# 将字节对转换为整数列表w = [int(i) for i in w.split(" ")]# 循环处理字节对while l < len(w):candidate = tuple(w[l:r])# 如果候选字节对长度大于1且在词汇表中if len(candidate) > 1 and candidate in self.byte2token:# 将候选字节对的 ID 添加到编码结果列表中encoded.append(self.byte2token[candidate])l, r = r, len(w)# 如果候选字节对长度为1elif len(candidate) == 1:# 将候选字节的 ID 添加到编码结果列表中encoded.append(candidate[0])l, r = r, len(w)else:# 如果候选字节对不在词汇表中,则减小上下文窗口大小并重试r -= 1# 返回编码结果列表return encodeddef inverse_transform(self, codes):"""Transform an encoded sequence of byte pair codeword IDs back intohuman-readable text.Parameters----------codes : list of `N` listsA list of `N` lists. Each sublist is a collection of integerbyte-pair token IDs representing a particular text string.Returns-------text: list of `N` stringsThe decoded strings corresponding to the `N` sublists in `codes`.Examples-------->>> B = BytePairEncoder(max_merges=100).fit("./example.txt")>>> encoded_tokens = B.transform("Hello! How are you 😁 ?")>>> encoded_tokens[[72, 879, 474, ...]]>>> B.inverse_transform(encoded_tokens)["Hello! How are you 😁 ?"]"""# 如果输入的codes是一个整数,将其转换为包含一个列表的形式if isinstance(codes[0], int):codes = [codes]decoded = []P = self.parameters# 遍历codes中的每个列表for code in codes:# 将每个token转换为对应的字节_bytes = [self.token2byte[t] if t > 255 else [t] for t in code]# 将字节列表展开为一维列表_bytes = [b for blist in _bytes for b in blist]# 将字节转换为字符并添加到decoded列表中decoded.append(bytes_to_chars(_bytes, encoding=P["encoding"]))return decoded@propertydef codebook(self):"""A list of the learned byte pair codewords, decoded into human-readableformat"""# 返回学习到的字节对编码的人类可读形式return [self.inverse_transform(t)[0]for t in self.byte2token.keys()if isinstance(t, tuple)]@propertydef tokens(self):"""A list of the byte pair codeword IDs"""# 返回字节对编码的ID列表return list(self.token2byte.keys())
# 定义节点类,用于构建哈夫曼树
class Node(object):def __init__(self, key, val):self.key = keyself.val = valself.left = Noneself.right = None# 重载大于运算符def __gt__(self, other):"""Greater than"""if not isinstance(other, Node):return -1return self.val > other.val# 重载大于等于运算符def __ge__(self, other):"""Greater than or equal to"""if not isinstance(other, Node):return -1return self.val >= other.val# 重载小于运算符def __lt__(self, other):"""Less than"""if not isinstance(other, Node):return -1return self.val < other.val# 重载小于等于运算符def __le__(self, other):"""Less than or equal to"""if not isinstance(other, Node):return -1return self.val <= other.val# 定义哈夫曼编码器类
class HuffmanEncoder(object):# 为文本中的标记构建一个哈夫曼树,并计算每个标记的二进制编码。# 在哈夫曼编码中,出现频率更高的标记通常使用较少的位表示。哈夫曼编码产生了所有方法中对单独编码标记的最小期望码字长度。# 哈夫曼编码对应于通过二叉树的路径,其中1表示“向右移动”,0表示“向左移动”。与标准二叉树相反,哈夫曼树是自底向上构建的。构造始于初始化一个最小堆优先队列,其中包含语料库中的每个标记,优先级对应于标记频率。在每一步中,语料库中最不频繁的两个标记被移除,并成为一个父伪标记的子节点,其“频率”是其子节点频率的总和。将这个新的父伪标记添加到优先队列中,并递归重复这个过程,直到没有标记剩余。# 参数# text: 字符串列表或Vocabulary类的实例# 标记化的文本或用于构建哈夫曼编码的预训练Vocabulary对象。def fit(self, text):# 构建哈夫曼树self._build_tree(text)# 生成编码self._generate_codes()def transform(self, text):"""Transform the words in `text` into their Huffman-code representations.Parameters----------text: list of `N` stringsThe list of words to encodeReturns-------codes : list of `N` binary stringsThe encoded words in `text`"""# 如果输入的是字符串,则转换为包含该字符串的列表if isinstance(text, str):text = [text]# 遍历文本中的每个单词for token in set(text):# 如果单词不在 Huffman 树中,则抛出警告并跳过if token not in self._item2code:raise Warning("Token '{}' not in Huffman tree. Skipping".format(token))# 返回每个单词的 Huffman 编码return [self._item2code.get(t, None) for t in text]def inverse_transform(self, codes):"""Transform an encoded sequence of bit-strings back into words.Parameters----------codes : list of `N` binary stringsA list of encoded bit-strings, represented as strings.Returns-------text: list of `N` stringsThe decoded text."""# 如果输入的是字符串,则转换为包含该字符串的列表if isinstance(codes, str):codes = [codes]# 遍历编码序列中的每个编码for code in set(codes):# 如果编码不在 Huffman 树中,则抛出警告并跳过if code not in self._code2item:raise Warning("Code '{}' not in Huffman tree. Skipping".format(code))# 返回每个编码对应的单词return [self._code2item.get(c, None) for c in codes]@propertydef tokens(self):"""A list the unique tokens in `text`"""# 返回 Huffman 树中的所有唯一单词return list(self._item2code.keys())@propertydef codes(self):"""A list with the Huffman code for each unique token in `text`"""# 返回 Huffman 树中每个唯一单词的 Huffman 编码return list(self._code2item.keys())def _counter(self, text):counts = {}# 统计文本中每个单词的出现次数for item in text:counts[item] = counts.get(item, 0) + 1return counts# 构建哈夫曼树def _build_tree(self, text):"""Construct Huffman Tree"""# 初始化优先队列PQ = []# 如果输入是 Vocabulary 对象,则使用其 counts 属性if isinstance(text, Vocabulary):counts = text.countselse:# 否则使用 _counter 方法计算频率counts = self._counter(text)# 将每个字符及其频率作为节点加入优先队列for (k, c) in counts.items():PQ.append(Node(k, c))# 创建一个优先队列,优先级为频率heapq.heapify(PQ)# 构建哈夫曼树while len(PQ) > 1:node1 = heapq.heappop(PQ) # 弹出频率最小的节点node2 = heapq.heappop(PQ) # 弹出频率第二小的节点parent = Node(None, node1.val + node2.val)parent.left = node1parent.right = node2heapq.heappush(PQ, parent)self._root = heapq.heappop(PQ)# 生成编码def _generate_codes(self):current_code = ""self._item2code = {}self._code2item = {}self._build_code(self._root, current_code)# 递归构建编码def _build_code(self, root, current_code):if root is None:returnif root.key is not None:# 将叶子节点的字符与编码对应存储self._item2code[root.key] = current_codeself._code2item[current_code] = root.keyreturn# 0 = 向左移动,1 = 向右移动self._build_code(root.left, current_code + "0")self._build_code(root.right, current_code + "1")
# 定义 Token 类,用于表示一个单词的计数和内容
class Token:def __init__(self, word):# 初始化单词计数为 0self.count = 0# 初始化单词内容self.word = worddef __repr__(self):"""A string representation of the token"""# 返回 Token 对象的字符串表示,包括单词内容和计数return "Token(word='{}', count={})".format(self.word, self.count)# 定义 TFIDFEncoder 类,用于计算 TF-IDF 编码
class TFIDFEncoder:def __init__(self,vocab=None,lowercase=True,min_count=0,smooth_idf=True,max_tokens=None,input_type="files",filter_stopwords=True,filter_punctuation=True,tokenizer="words",):# 初始化 TFIDFEncoder 对象的各种参数# 定义内部方法 _encode_document,用于对文档进行编码def _encode_document(self, doc, word2idx, idx2word, tokens, doc_count, bol_ix, eol_ix,):"""Perform tokenization and compute token counts for a single document"""# 获取超参数H = self.hyperparameters# 是否转换为小写lowercase = H["lowercase"]# 是否过滤停用词filter_stop = H["filter_stopwords"]# 是否过滤标点符号filter_punc = H["filter_punctuation"]# 如果输入类型为文件if H["input_type"] == "files":# 打开文件并读取内容with open(doc, "r", encoding=H["encoding"]) as handle:doc = handle.read()# 定义不同类型的分词器tokenizer_dict = {"words": tokenize_words,"characters": tokenize_chars,"whitespace": tokenize_whitespace,"bytes": tokenize_bytes_raw,}# 根据超参数选择相应的分词器tokenizer = tokenizer_dict[H["tokenizer"]]# 初始化单词数量n_words = 0# 将文档按行分割lines = doc.split("\n")# 遍历每一行for line in lines:# 对每一行进行分词words = tokenizer(line,lowercase=lowercase,filter_stopwords=filter_stop,filter_punctuation=filter_punc,encoding=H["encoding"],)# 过滤词汇表中不存在的词words = self._filter_vocab(words)# 更新单词数量n_words += len(words)# 遍历每个词for ww in words:# 如果词不在 word2idx 中,则添加if ww not in word2idx:word2idx[ww] = len(tokens)idx2word[len(tokens)] = wwtokens.append(Token(ww))# 获取词的索引t_idx = word2idx[ww]# 更新词频tokens[t_idx].count += 1# 更新文档中词的出现次数doc_count[t_idx] = doc_count.get(t_idx, 0) + 1# 在每行开头和结尾添加 <bol> 和 <eol> 标签tokens[bol_ix].count += 1tokens[eol_ix].count += 1doc_count[bol_ix] = doc_count.get(bol_ix, 0) + 1doc_count[eol_ix] = doc_count.get(eol_ix, 0) + 1# 返回单词到索引的映射、索引到单词的映射、单词列表、文档中单词出现次数return word2idx, idx2word, tokens, doc_count# 保留前 N 个最频繁出现的词汇def _keep_top_n_tokens(self):# 获取最大词汇数N = self.hyperparameters["max_tokens"]# 初始化词汇计数、词汇到索引、索引到词汇的字典doc_counts, word2idx, idx2word = {}, {}, {}# 根据词汇出现次数排序词汇列表tokens = sorted(self._tokens, key=lambda x: x.count, reverse=True)# 重新索引前 N 个词汇...unk_ix = Nonefor idx, tt in enumerate(tokens[:N]):word2idx[tt.word] = idxidx2word[idx] = tt.word# 如果 <unk> 不在前 N 个词汇中,将其添加进去,替换第 N 个最频繁出现的词汇,并相应调整 <unk> 的计数...if tt.word == "<unk>":unk_ix = idx# ... 最后,将所有被删除的词汇重新编码为 "<unk>"for tt in tokens[N:]:tokens[unk_ix].count += tt.count# ... 最后,重新为每个文档重新索引词汇计数for d_ix in self.term_freq.keys():doc_counts[d_ix] = {}for old_ix, d_count in self.term_freq[d_ix].items():word = self.idx2token[old_ix]new_ix = word2idx.get(word, unk_ix)doc_counts[d_ix][new_ix] = doc_counts[d_ix].get(new_ix, 0) + d_count# 更新词汇列表、词汇到索引、索引到词汇的字典以及文档词频self._tokens = tokens[:N]self.token2idx = word2idxself.idx2token = idx2wordself.term_freq = doc_counts# 断言词汇列表长度不超过 Nassert len(self._tokens) <= Ndef _drop_low_freq_tokens(self):"""替换所有出现次数少于 `min_count` 的标记为 `<unk>` 标记。"""H = self.hyperparameters# 获取 `<unk>` 标记的索引unk_token = self._tokens[self.token2idx["<unk>"]]# 获取 `<eol>` 标记的索引eol_token = self._tokens[self.token2idx["<eol>"]]# 获取 `<bol>` 标记的索引bol_token = self._tokens[self.token2idx["<bol>"]]# 初始化特殊标记列表tokens = [unk_token, eol_token, bol_token]# 初始化 `<unk>` 标记的索引unk_idx = 0# 初始化特殊标记到索引的映射word2idx = {"<unk>": 0, "<eol>": 1, "<bol>": 2}# 初始化索引到特殊标记的映射idx2word = {0: "<unk>", 1: "<eol>", 2: "<bol>"}# 初始化特殊标记集合special = {"<eol>", "<bol>", "<unk>"}# 遍历所有标记for tt in self._tokens:# 如果标记不是特殊标记if tt.word not in special:# 如果标记出现次数小于 `min_count`if tt.count < H["min_count"]:# 将出现次数加到 `<unk>` 标记上tokens[unk_idx].count += tt.countelse:# 更新标记到索引的映射word2idx[tt.word] = len(tokens)# 更新索引到标记的映射idx2word[len(tokens)] = tt.word# 添加标记到列表中tokens.append(tt)# 重新索引文档计数doc_counts = {}for d_idx in self.term_freq.keys():doc_counts[d_idx] = {}for old_idx, d_count in self.term_freq[d_idx].items():word = self.idx2token[old_idx]new_idx = word2idx.get(word, unk_idx)doc_counts[d_idx][new_idx] = doc_counts[d_idx].get(new_idx, 0) + d_count# 更新标记列表self._tokens = tokens# 更新标记到索引的映射self.token2idx = word2idx# 更新索引到标记的映射self.idx2token = idx2word# 更新文档计数self.term_freq = doc_counts# 对 tokens 进行排序,按字母顺序排序并重新编码def _sort_tokens(self):# 初始化索引ix = 0# 初始化 token 到索引和索引到 token 的字典token2idx, idx2token, = ({},{},)# 特殊 token 列表special = ["<eol>", "<bol>", "<unk>"]# 对 token2idx 字典中的键进行排序words = sorted(self.token2idx.keys())# 初始化 term_freq 字典term_freq = {d: {} for d in self.term_freq.keys()}# 遍历排序后的 tokensfor w in words:# 如果当前 token 不在特殊 token 列表中if w not in special:# 获取当前 token 的旧索引old_ix = self.token2idx[w]# 更新 token2idx 和 idx2token 字典token2idx[w], idx2token[ix] = ix, w# 更新 term_freq 字典for d in self.term_freq.keys():if old_ix in self.term_freq[d]:count = self.term_freq[d][old_ix]term_freq[d][ix] = countix += 1# 处理特殊 tokenfor w in special:token2idx[w] = len(token2idx)idx2token[len(idx2token)] = w# 更新对象的 token2idx、idx2token、term_freq 和 vocab_counts 属性self.token2idx = token2idxself.idx2token = idx2tokenself.term_freq = term_freqself.vocab_counts = Counter({t.word: t.count for t in self._tokens})def _calc_idf(self):"""计算语料库中每个标记的(平滑的)逆文档频率。对于一个单词标记 `w`,IDF 简单地定义为IDF(w) = log ( |D| / |{ d in D: w in d }| ) + 1其中 D 是语料库中所有文档的集合,D = {d1, d2, ..., dD}如果 `smooth_idf` 为 True,我们对包含给定单词的文档数量进行加法平滑处理,相当于假设存在第 D+1 个文档,其中包含语料库中的每个单词:SmoothedIDF(w) = log ( |D| + 1 / [1 + |{ d in D: w in d }|] ) + 1"""inv_doc_freq = {}smooth_idf = self.hyperparameters["smooth_idf"]tf, doc_idxs = self.term_freq, self._idx2doc.keys()D = len(self._idx2doc) + int(smooth_idf)for word, w_ix in self.token2idx.items():d_count = int(smooth_idf)d_count += np.sum([1 if w_ix in tf[d_ix] else 0 for d_ix in doc_idxs])inv_doc_freq[w_ix] = 1 if d_count == 0 else np.log(D / d_count) + 1self.inv_doc_freq = inv_doc_freqdef transform(self, ignore_special_chars=True):"""生成文本语料库的词频-逆文档频率编码。Parameters----------ignore_special_chars : bool是否从最终的tfidf编码中删除与"<eol>", "<bol>", "<unk>"标记对应的列。默认为True。Returns-------tfidf : numpy array of shape `(D, M [- 3])`编码后的语料库,每行对应一个文档,每列对应一个标记ID。如果`ignore_special_chars`为False,则在`idx2token`属性中存储列号与标记之间的映射。否则,映射不准确。"""D, N = len(self._idx2doc), len(self._tokens)# 初始化词频矩阵和逆文档频率矩阵tf = np.zeros((D, N))idf = np.zeros((D, N))# 遍历文档索引for d_ix in self._idx2doc.keys():# 获取文档中的词和词频words, counts = zip(*self.term_freq[d_ix].items())# 创建文档索引数组docs = np.ones(len(words), dtype=int) * d_ix# 更新词频矩阵tf[docs, words] = counts# 获取所有词的排序列表words = sorted(self.idx2token.keys())# 根据词的逆文档频率创建矩阵idf = np.tile(np.array([self.inv_doc_freq[w] for w in words]), (D, 1))# 计算tfidf矩阵tfidf = tf * idf# 如果忽略特殊字符if ignore_special_chars:# 获取特殊字符的索引idxs = [self.token2idx["<unk>"],self.token2idx["<eol>"],self.token2idx["<bol>"],]# 从tfidf矩阵中删除特殊字符列tfidf = np.delete(tfidf, idxs, 1)# 返回tfidf矩阵return tfidf
# 定义一个名为 Vocabulary 的类
class Vocabulary:# 初始化方法,设置类的属性def __init__(self,lowercase=True, # 是否将单词转换为小写,默认为Truemin_count=None, # 单词最小出现次数,默认为Nonemax_tokens=None, # 最大单词数量,默认为Nonefilter_stopwords=True, # 是否过滤停用词,默认为Truefilter_punctuation=True, # 是否过滤标点符号,默认为Truetokenizer="words", # 分词器类型,默认为"words"):"""用于编译和编码文本语料库中唯一标记的对象。参数----------lowercase : bool是否在标记化之前将每个字符串转换为小写。默认为 True。min_count : int标记必须出现的最小次数才能包含在词汇表中。如果为 `None`,则在词汇表中包含来自 `corpus_fp` 的所有标记。默认为 None。max_tokens : int仅将出现次数超过 `min_count` 的前 `max_tokens` 个最常见标记添加到词汇表中。如果为 None,则添加所有出现次数超过 `min_count` 的标记。默认为 None。filter_stopwords : bool是否在对语料库中的单词进行编码之前删除停用词。默认为 True。filter_punctuation : bool是否在对语料库中的单词进行编码之前删除标点符号。默认为 True。tokenizer : {'whitespace', 'words', 'characters', 'bytes'}在将字符串映射到标记时要遵循的策略。 `'whitespace'` 标记化器在空格字符处拆分字符串。`'words'` 标记化器使用“单词”正则表达式拆分字符串。`'characters'` 标记化器将字符串拆分为单个字符。`'bytes'` 标记化器将字符串拆分为一组单个字节。"""self.hyperparameters = {"id": "Vocabulary","encoding": None,"corpus_fps": None,"lowercase": lowercase,"min_count": min_count,"max_tokens": max_tokens,"filter_stopwords": filter_stopwords,"filter_punctuation": filter_punctuation,"tokenizer": tokenizer,}def __len__(self):"""返回词汇表中标记的数量"""return len(self._tokens)# 返回一个迭代器,用于遍历词汇表中的标记def __iter__(self):return iter(self._tokens)# 判断给定的单词是否是词汇表中的一个标记def __contains__(self, word):return word in self.token2idx# 根据键返回词汇表中的标记(如果键是整数)或索引(如果键是字符串)def __getitem__(self, key):if isinstance(key, str):return self._tokens[self.token2idx[key]]if isinstance(key, int):return self._tokens[key]# 返回词汇表中唯一单词标记的数量@propertydef n_tokens(self):return len(self.token2idx)# 返回语料库中单词的总数@propertydef n_words(self):return sum(self.counts.values())# 返回词汇表中唯一单词标记的形状@propertydef shape(self):return self._tokens.shape# 返回语料库中出现频率最高的前n个标记def most_common(self, n=5):return self.counts.most_common()[:n]# 返回在语料库中出现k次的所有标记def words_with_count(self, k):return [w for w, c in self.counts.items() if c == k]def filter(self, words, unk=True): # noqa: A003"""Filter (or replace) any word in `words` that is not present in`Vocabulary`.Parameters----------words : list of strsA list of words to filterunk : boolWhether to replace any out of vocabulary words in `words` with the``<unk>`` token (True) or skip them entirely (False). Default isTrue.Returns-------filtered : list of strsThe list of words filtered against the words in Vocabulary."""# 如果 unk 为 True,则将不在 Vocabulary 中的单词替换为 "<unk>",否则跳过if unk:return [w if w in self else "<unk>" for w in words]# 如果 unk 为 False,则只保留在 Vocabulary 中的单词return [w for w in words if w in self]def words_to_indices(self, words):"""Convert the words in `words` to their token indices. If a word is notin the vocabulary, return the index for the ``<unk>`` tokenParameters----------words : list of strsA list of words to filterReturns-------indices : list of intsThe token indices for each word in `words`"""# 获取 "<unk>" 的索引unk_ix = self.token2idx["<unk>"]# 获取是否转换为小写的设置lowercase = self.hyperparameters["lowercase"]# 如果需要转换为小写,则将单词列表中的单词转换为小写words = [w.lower() for w in words] if lowercase else words# 将单词转换为它们在词汇表中的索引,如果不在词汇表中,则返回 "<unk>" 的索引return [self.token2idx[w] if w in self else unk_ix for w in words]def indices_to_words(self, indices):"""Convert the indices in `indices` to their word values. If an index isnot in the vocabulary, return the ``<unk>`` token.Parameters----------indices : list of intsThe token indices for each word in `words`Returns-------words : list of strsThe word strings corresponding to each token index in `indices`"""# 设置 "<unk>" 标记unk = "<unk>"# 将索引转换为对应的单词,如果索引不在词汇表中,则返回 "<unk>"return [self.idx2token[i] if i in self.idx2token else unk for i in indices]# 保留词汇表中出现频率最高的前 N 个词的索引def _keep_top_n_tokens(self):# 初始化空字典,用于存储词汇表中词语到索引的映射关系word2idx, idx2word = {}, {}# 获取最大词汇量 NN = self.hyperparameters["max_tokens"]# 根据词频对词汇表中的词进行排序tokens = sorted(self._tokens, key=lambda x: x.count, reverse=True)# 重新索引前 N 个词...unk_ix = Nonefor idx, tt in enumerate(tokens[:N]):# 将词语和对应的索引存入字典中word2idx[tt.word] = idxidx2word[idx] = tt.word# 如果词语是 "<unk>",记录其索引if tt.word == "<unk>":unk_ix = idx# ... 如果 "<unk>" 不在前 N 个词中,将其添加进去,替换第 N 个最常见的词,并相应调整 "<unk>" 的计数 ...if unk_ix is None:unk_ix = self.token2idx["<unk>"]old_count = tokens[N - 1].counttokens[N - 1] = self._tokens[unk_ix]tokens[N - 1].count += old_countword2idx["<unk>"] = N - 1idx2word[N - 1] = "<unk>"# ... 将所有被删除的词重新编码为 "<unk>"for tt in tokens[N:]:tokens[unk_ix].count += tt.count# 更新词汇表为前 N 个词self._tokens = tokens[:N]self.token2idx = word2idxself.idx2token = idx2word# 断言词汇表长度不超过 Nassert len(self._tokens) <= Ndef _drop_low_freq_tokens(self):"""Replace all tokens that occur less than `min_count` with the `<unk>`token."""# 获取 `<unk>` token 的索引unk_idx = 0# 获取 `<unk>`、`<eol>`、`<bol>` token 对应的索引unk_token = self._tokens[self.token2idx["<unk>"]]eol_token = self._tokens[self.token2idx["<eol>"]]bol_token = self._tokens[self.token2idx["<bol>"]]# 获取超参数H = self.hyperparameters# 初始化特殊 token 列表tokens = [unk_token, eol_token, bol_token]# 初始化特殊 token 到索引的映射word2idx = {"<unk>": 0, "<eol>": 1, "<bol>": 2}# 初始化索引到特殊 token 的映射idx2word = {0: "<unk>", 1: "<eol>", 2: "<bol>"}# 特殊 token 集合special = {"<eol>", "<bol>", "<unk>"}# 遍历所有 tokenfor tt in self._tokens:# 如果 token 不是特殊 tokenif tt.word not in special:# 如果 token 出现次数小于 min_countif tt.count < H["min_count"]:# 将出现次数小于 min_count 的 token 替换为 `<unk>` tokentokens[unk_idx].count += tt.countelse:# 更新 token 到索引的映射word2idx[tt.word] = len(tokens)# 更新索引到 token 的映射idx2word[len(tokens)] = tt.word# 添加当前 token 到 tokens 列表中tokens.append(tt)# 更新 tokens 列表self._tokens = tokens# 更新 token 到索引的映射self.token2idx = word2idx# 更新索引到 token 的映射self.idx2token = idx2word
Preprocessing
The preprocessing module implements common data preprocessing routines.
-
nlp.py
: Routines and objects for handling text data.- n-gram generators
- Word and character tokenization
- Punctuation and stop-word removal
- Vocabulary / unigram count objects
- Byte-pair encoding (Gage, 1994; Sennrich, Haddow, & Birch, 2015)
- Huffman tree encoding / decoding
- Term frequency-inverse document frequency (tf-idf) encoding
-
dsp.py
: Routines for handling audio and image data.- Signal windowing
- Signal autocorrelation
- Discrete Fourier transform
- Discrete cosine transform (type II)
- Signal resampling via (bi-)linear interpolation and nearest neighbor
- Mel-frequency cepstral coefficients (MFCCs) (Mermelstein, 1976; Davis & Mermelstein, 1980)
-
general.py
: General data preprocessing objects and functions.- Feature hashing (Moody, 1989)
- Mini-batch generators
- One-hot encoding / decoding
- Feature standardization
numpy-ml\numpy_ml\preprocessing\__init__.py
# 从当前目录中导入 general 模块
from . import general
# 从当前目录中导入 nlp 模块
from . import nlp
# 从当前目录中导入 dsp 模块
from . import dsp
Models
This repo includes code for the following models:
-
Gaussian mixture model
- EM training
-
Hidden Markov model
- Viterbi decoding
- Likelihood computation
- MLE parameter estimation via Baum-Welch/forward-backward algorithm
-
Latent Dirichlet allocation (topic model)
- Standard model with MLE parameter estimation via variational EM
- Smoothed model with MAP parameter estimation via MCMC
-
Neural networks
- Layers / Layer-wise ops
- Add
- Flatten
- Multiply
- Softmax
- Fully-connected/Dense
- Sparse evolutionary connections
- LSTM
- Elman-style RNN
- Max + average pooling
- Dot-product attention
- Embedding layer
- Restricted Boltzmann machine (w. CD-n training)
- 2D deconvolution (w. padding and stride)
- 2D convolution (w. padding, dilation, and stride)
- 1D convolution (w. padding, dilation, stride, and causality)
- Modules
- Bidirectional LSTM
- ResNet-style residual blocks (identity and convolution)
- WaveNet-style residual blocks with dilated causal convolutions
- Transformer-style multi-headed scaled dot product attention
- Regularizers
- Dropout
- Normalization
- Batch normalization (spatial and temporal)
- Layer normalization (spatial and temporal)
- Optimizers
- SGD w/ momentum
- AdaGrad
- RMSProp
- Adam
- Learning Rate Schedulers
- Constant
- Exponential
- Noam/Transformer
- Dlib scheduler
- Weight Initializers
- Glorot/Xavier uniform and normal
- He/Kaiming uniform and normal
- Standard and truncated normal
- Losses
- Cross entropy
- Squared error
- Bernoulli VAE loss
- Wasserstein loss with gradient penalty
- Noise contrastive estimation loss
- Activations
- ReLU
- Tanh
- Affine
- Sigmoid
- Leaky ReLU
- ELU
- SELU
- Exponential
- Hard Sigmoid
- Softplus
- Models
- Bernoulli variational autoencoder
- Wasserstein GAN with gradient penalty
- word2vec encoder with skip-gram and CBOW architectures
- Utilities
col2im
(MATLAB port)im2col
(MATLAB port)conv1D
conv2D
deconv2D
minibatch
- Layers / Layer-wise ops
-
Tree-based models
- Decision trees (CART)
- [Bagging] Random forests
- [Boosting] Gradient-boosted decision trees
-
Linear models
- Ridge regression
- Logistic regression
- Ordinary least squares
- Gaussian naive Bayes classifier
- Generalized linear model (identity, log, and logit links)
- Bayesian linear regression w/ conjugate priors
- Unknown mean, known variance (Gaussian prior)
- Unknown mean, unknown variance (Normal-Gamma / Normal-Inverse-Wishart prior)
-
n-Gram sequence models
- Maximum likelihood scores
- Additive/Lidstone smoothing
- Simple Good-Turing smoothing
-
Multi-armed bandit models
- UCB1
- LinUCB
- Epsilon-greedy
- Thompson sampling w/ conjugate priors
- Beta-Bernoulli sampler
- LinUCB
-
Reinforcement learning models
- Cross-entropy method agent
- First visit on-policy Monte Carlo agent
- Weighted incremental importance sampling Monte Carlo agent
- Expected SARSA agent
- TD-0 Q-learning agent
- Dyna-Q / Dyna-Q+ with prioritized sweeping
-
Nonparameteric models
- Nadaraya-Watson kernel regression
- k-Nearest neighbors classification and regression
- Gaussian process regression
-
Matrix factorization
- Regularized alternating least-squares
- Non-negative matrix factorization
-
Preprocessing
- Discrete Fourier transform (1D signals)
- Discrete cosine transform (type-II) (1D signals)
- Bilinear interpolation (2D signals)
- Nearest neighbor interpolation (1D and 2D signals)
- Autocorrelation (1D signals)
- Signal windowing
- Text tokenization
- Feature hashing
- Feature standardization
- One-hot encoding / decoding
- Huffman coding / decoding
- Byte pair encoding / decoding
- Term frequency-inverse document frequency (TF-IDF) encoding
- MFCC encoding
-
Utilities
- Similarity kernels
- Distance metrics
- Priority queue
- Ball tree
- Discrete sampler
- Graph processing and generators
numpy-ml\numpy_ml\rl_models\agents.py
# 引入必要的库
from abc import ABC, abstractmethod
from collections import defaultdict
import numpy as np# 从自定义的 rl_utils 模块中引入 EnvModel, env_stats, tile_state_space
from .rl_utils import EnvModel, env_stats, tile_state_space
# 从自定义的 data_structures 模块中引入 Dict
from ..utils.data_structures import Dict# 定义一个抽象基类 AgentBase
class AgentBase(ABC):# 初始化 AgentBase 类def __init__(self, env):super().__init__()self.env = envself.parameters = {}self.hyperparameters = {}self.derived_variables = {}self.env_info = env_stats(env)# 创建观测和动作的映射字典def _create_2num_dicts(self, obs_encoder=None, act_encoder=None):E = self.env_infon_states = np.prod(E["n_obs_per_dim"])n_actions = np.prod(E["n_actions_per_dim"])# 创建动作到标量的字典和标量到动作的字典self._num2action = Dict()self._action2num = Dict(act_encoder)if n_actions != np.inf:self._action2num = {act: i for i, act in enumerate(E["action_ids"])}self._num2action = {i: act for act, i in self._action2num.items()}# 创建观测到标量的字典和标量到观测的字典self._num2obs = Dict()self._obs2num = Dict(obs_encoder)if n_states != np.inf:self._obs2num = {act: i for i, act in enumerate(E["obs_ids"])}self._num2obs = {i: act for act, i in self._obs2num.items()}# 清空历史记录def flush_history(self):"""Clear the episode history"""for k, v in self.episode_history.items():self.episode_history[k] = []# 抽象方法,根据当前观测生成动作@abstractmethoddef act(self, obs):"""Generate an action given the current observation"""raise NotImplementedError# 抽象方法,采取贪婪策略@abstractmethoddef greedy_policy(self, **kwargs):"""Take a greedy action.Returns-------total_reward : floatThe total reward on the episode.n_steps : floatThe total number of steps taken on the episode."""raise NotImplementedError@abstractmethod# 定义一个方法,用于运行 agent 在一个单独的 episode 上def run_episode(self, max_steps, render=False):"""Run the agent on a single episode.Parameters----------max_steps : intThe maximum number of steps to run an episoderender : boolWhether to render the episode during trainingReturns-------reward : floatThe total reward on the episode, averaged over the theta samples.steps : floatThe total number of steps taken on the episode, averaged over thetheta samples."""# 抛出未实现的错误,需要在子类中实现该方法raise NotImplementedError# 定义一个抽象方法,用于更新 agent 的参数根据当前 episode 上获得的奖励@abstractmethoddef update(self):r"""Update the agent parameters according to the rewards accrued on thecurrent episode.Returns-------avg_reward : floatThe average reward earned by the best `retain_prcnt` theta sampleson the current episode."""# 抛出未实现的错误,需要在子类中实现该方法raise NotImplementedError
class CrossEntropyAgent(AgentBase):# 定义交叉熵代理类,继承自AgentBase基类def _init_params(self):# 初始化参数方法E = self.env_info# 获取环境信息assert not E["continuous_actions"], "Action space must be discrete"# 断言动作空间必须是离散的self._create_2num_dicts()# 调用私有方法创建两个数字字典b_len = np.prod(E["n_actions_per_dim"])# 计算动作维度的乘积作为b_lenW_len = b_len * np.prod(E["obs_dim"])# 计算观测维度的乘积与b_len相乘作为W_lentheta_dim = b_len + W_len# 计算theta的维度# init mean and variance for mv gaussian with dimensions theta_dim# 初始化维度为theta_dim的多变量高斯分布的均值和方差theta_mean = np.random.rand(theta_dim)# 生成theta_dim维度的随机均值theta_var = np.ones(theta_dim)# 生成theta_dim维度的方差为1的数组self.parameters = {"theta_mean": theta_mean, "theta_var": theta_var}# 设置参数字典包含均值和方差self.derived_variables = {"b_len": b_len,"W_len": W_len,"W_samples": [],"b_samples": [],"episode_num": 0,"cumulative_rewards": [],}# 设置派生变量字典包含b_len、W_len、W_samples、b_samples、episode_num和cumulative_rewardsself.hyperparameters = {"agent": "CrossEntropyAgent","retain_prcnt": self.retain_prcnt,"n_samples_per_episode": self.n_samples_per_episode,}# 设置超参数字典包含代理名称、保留百分比和每个episode的样本数self.episode_history = {"rewards": [], "state_actions": []}# 设置episode历史字典包含奖励和状态动作对def act(self, obs):r"""Generate actions according to a softmax policy.Notes-----The softmax policy assumes that the pmf over actions in state :math:`x_t` isgiven by:.. math::\pi(a | x^{(t)}) = \text{softmax}(\text{obs}^{(t)} \cdot \mathbf{W}_i^{(t)} + \mathbf{b}_i^{(t)} )where :math:`\mathbf{W}` is a learned weight matrix, `obs` is the observationat timestep `t`, and **b** is a learned bias vector.Parameters----------obs : int or :py:class:`ndarray <numpy.ndarray>`An observation from the environment.Returns-------action : int, float, or :py:class:`ndarray <numpy.ndarray>`An action sampled from the distribution over actions defined by thesoftmax policy."""E, P = self.env_info, self.parametersW, b = P["W"], P["b"]s = self._obs2num[obs]s = np.array([s]) if E["obs_dim"] == 1 else s# compute softmax# 计算 softmax 分布的分子部分Z = s.T @ W + b# 对分子部分进行指数化,减去最大值以防止数值不稳定e_Z = np.exp(Z - np.max(Z, axis=-1, keepdims=True))# 计算 softmax 分布action_probs = e_Z / e_Z.sum(axis=-1, keepdims=True)# sample action# 从 softmax 分布中采样一个动作a = np.random.multinomial(1, action_probs).argmax()# 返回对应动作的编号return self._num2action[a]# 运行智能体在单个 episode 上的操作def run_episode(self, max_steps, render=False):"""Run the agent on a single episode.Parameters----------max_steps : intThe maximum number of steps to run an episoderender : boolWhether to render the episode during trainingReturns-------reward : floatThe total reward on the episode, averaged over the theta samples.steps : floatThe total number of steps taken on the episode, averaged over thetheta samples."""# 从 theta 样本中采样self._sample_thetas()# 获取环境信息和派生变量E, D = self.env_info, self.derived_variablesn_actions = np.prod(E["n_actions_per_dim"])W_len, obs_dim = D["W_len"], E["obs_dim"]steps, rewards = [], []# 遍历 theta 样本for theta in D["theta_samples"]:W = theta[:W_len].reshape(obs_dim, n_actions)b = theta[W_len:]# 运行 episode,获取总奖励和步数total_rwd, n_steps = self._episode(W, b, max_steps, render)rewards.append(total_rwd)steps.append(n_steps)# 返回当前 episode 所有样本的平均奖励和平均步数D["episode_num"] += 1D["cumulative_rewards"] = rewardsreturn np.mean(D["cumulative_rewards"]), np.mean(steps)def _episode(self, W, b, max_steps, render):"""Run the agent for an episode.Parameters----------W : :py:class:`ndarray <numpy.ndarray>` of shape `(obs_dim, n_actions)`The weights for the softmax policy.b : :py:class:`ndarray <numpy.ndarray>` of shape `(bias_len, )`The bias for the softmax policy.max_steps : intThe maximum number of steps to run the episode.render : boolWhether to render the episode during training.Returns-------reward : floatThe total reward on the episode.steps : floatThe total number of steps taken on the episode."""# 初始化奖励列表和状态-动作对列表rwds, sa = [], []# 获取当前 episode 的历史记录H = self.episode_history# 初始化总奖励和步数total_reward, n_steps = 0.0, 1# 重置环境并获取初始观察obs = self.env.reset()# 更新策略参数self.parameters["W"] = Wself.parameters["b"] = b# 循环执行每一步for i in range(max_steps):# 如果需要渲染环境,则进行渲染if render:self.env.render()# 增加步数计数n_steps += 1# 根据当前观察选择动作action = self.act(obs)# 将观察和动作转换为数字编码s, a = self._obs2num[obs], self._action2num[action]sa.append((s, a))# 执行动作,获取下一个观察和奖励obs, reward, done, _ = self.env.step(action)rwds.append(reward)total_reward += reward# 如果 episode 结束,则跳出循环if done:break# 将奖励列表和状态-动作对列表添加到历史记录中H["rewards"].append(rwds)H["state_actions"].append(sa)# 返回总奖励和步数return total_reward, n_steps# 更新 mu 和 Sigma,根据当前 episode 中获得的奖励def update(self):# 获取派生变量和参数D, P = self.derived_variables, self.parameters# 计算需要保留的样本数量n_retain = int(self.retain_prcnt * self.n_samples_per_episode)# 对每个 theta 样本的累积奖励进行排序,从大到小sorted_y_val_idxs = np.argsort(D["cumulative_rewards"])[::-1]top_idxs = sorted_y_val_idxs[:n_retain]# 使用最佳 theta 值更新 theta_mean 和 theta_varP["theta_mean"] = np.mean(D["theta_samples"][top_idxs], axis=0)P["theta_var"] = np.var(D["theta_samples"][top_idxs], axis=0)# 从具有均值为 theta_mean 和协方差为 diag(theta_var) 的多元高斯分布中采样 n_samples_per_episode 个 thetadef _sample_thetas(self):P, N = self.parameters, self.n_samples_per_episodeMu, Sigma = P["theta_mean"], np.diag(P["theta_var"])# 从多元高斯分布中生成样本samples = np.random.multivariate_normal(Mu, Sigma, N)# 将生成的样本保存在派生变量中self.derived_variables["theta_samples"] = samples# 定义一个贪婪策略函数,使用当前代理参数执行def greedy_policy(self, max_steps, render=True):"""Execute a greedy policy using the current agent parameters.Parameters----------max_steps : intThe maximum number of steps to run the episode.render : boolWhether to render the episode during execution.Returns-------total_reward : floatThe total reward on the episode.n_steps : floatThe total number of steps taken on the episode."""# 获取环境信息、派生变量和参数E, D, P = self.env_info, self.derived_variables, self.parameters# 获取参数中的均值和方差Mu, Sigma = P["theta_mean"], np.diag(P["theta_var"])# 从多元正态分布中采样一个样本sample = np.random.multivariate_normal(Mu, Sigma, 1)# 获取权重矩阵的长度和观测维度W_len, obs_dim = D["W_len"], E["obs_dim"]# 计算动作空间的维度n_actions = np.prod(E["n_actions_per_dim"])# 从样本中提取权重矩阵和偏置向量W = sample[0, :W_len].reshape(obs_dim, n_actions)b = sample[0, W_len:]# 执行一个 episode,返回总奖励和步数total_reward, n_steps = self._episode(W, b, max_steps, render)# 返回总奖励和步数return total_reward, n_steps
class MonteCarloAgent(AgentBase):# 定义一个 Monte-Carlo 学习代理类,继承自 AgentBase 类def __init__(self, env, off_policy=False, temporal_discount=0.9, epsilon=0.1):"""A Monte-Carlo learning agent trained using either first-visit MonteCarlo updates (on-policy) or incremental weighted importance sampling(off-policy).Parameters----------env : :class:`gym.wrappers` or :class:`gym.envs` instanceThe environment to run the agent on.off_policy : boolWhether to use a behavior policy separate from the target policyduring training. If False, use the same epsilon-soft policy forboth behavior and target policies. Default is False.temporal_discount : float between [0, 1]The discount factor used for downweighting future rewards. Smallervalues result in greater discounting of future rewards. Default is0.9.epsilon : float between [0, 1]The epsilon value in the epsilon-soft policy. Larger valuesencourage greater exploration during training. Default is 0.1."""# 初始化 MonteCarloAgent 类的实例super().__init__(env)# 设置 epsilon 值self.epsilon = epsilon# 设置是否使用 off-policyself.off_policy = off_policy# 设置时间折扣因子self.temporal_discount = temporal_discount# 初始化参数self._init_params()# 初始化参数def _init_params(self):# 获取环境信息E = self.env_info# 确保动作空间是离散的assert not E["continuous_actions"], "Action space must be discrete"# 确保观察空间是离散的assert not E["continuous_observations"], "Observation space must be discrete"# 计算状态数量n_states = np.prod(E["n_obs_per_dim"])# 计算动作数量n_actions = np.prod(E["n_actions_per_dim"])# 创建状态和动作的映射字典self._create_2num_dicts()# 行为策略是随机的,epsilon-soft策略self.behavior_policy = self.target_policy = self._epsilon_soft_policy# 如果是离策略学习if self.off_policy:# 初始化C矩阵self.parameters["C"] = np.zeros((n_states, n_actions))# 目标策略是确定性的,贪婪策略self.target_policy = self._greedy# 初始化Q函数self.parameters["Q"] = np.random.rand(n_states, n_actions)# 初始化每个状态-动作对的回报对象self.derived_variables = {"returns": {(s, a): [] for s in range(n_states) for a in range(n_actions)},"episode_num": 0,}# 设置超参数self.hyperparameters = {"agent": "MonteCarloAgent","epsilon": self.epsilon,"off_policy": self.off_policy,"temporal_discount": self.temporal_discount,}# 初始化历史记录self.episode_history = {"state_actions": [], "rewards": []}# 定义一个贪婪行为策略函数,用于在离策略为真时使用def _greedy(self, s, a=None):"""A greedy behavior policy.Notes-----Only used when off-policy is True.Parameters----------s : int, float, or tupleThe state number for the current observation, as returned by``self._obs2num[obs]``.a : int, float, or tupleThe action number in the current state, as returned by``self._action2num[obs]``. If None, sample an action from the actionprobabilities in state `s`, otherwise, return the probability ofaction `a` under the greedy policy. Default is None.Returns-------action : int, float, or :py:class:`ndarray <numpy.ndarray>`If `a` is None, this is an action sampled from the distributionover actions defined by the greedy policy. If `a` is notNone, this is the probability of `a` under the greedy policy."""# 根据状态 s 对应的 Q 值,找到最大值对应的动作a_star = self.parameters["Q"][s, :].argmax()# 如果 a 为 None,则从贪婪策略中的动作概率分布中随机选择一个动作if a is None:out = self._num2action[a_star]# 如果 a 不为 None,则返回 a 在贪婪策略下的概率else:out = 1 if a == a_star else 0# 返回结果return out# 更新 Q 函数,使用基于策略的首次访问蒙特卡洛更新def _on_policy_update(self):r"""Update the `Q` function using an on-policy first-visit Monte Carloupdate.Notes-----The on-policy first-visit Monte Carlo update is.. math::Q'(s, a) \leftarrow\text{avg}(\text{reward following first visit to } (s, a)\text{ across all episodes})RL agents seek to learn action values conditional on subsequent optimalbehavior, but they need to behave non-optimally in order to explore allactions (to find the optimal actions).The on-policy approach is a compromise -- it learns action values notfor the optimal policy, but for a *near*-optimal policy that stillexplores (the epsilon-soft policy)."""# 获取派生变量、参数和历史记录D, P, HS = self.derived_variables, self.parameters, self.episode_history# 获取历史记录中的奖励和状态-动作对ep_rewards = HS["rewards"]sa_tuples = set(HS["state_actions"])# 找到每个状态-动作对第一次出现的位置locs = [HS["state_actions"].index(sa) for sa in sa_tuples]# 计算每个状态-动作对的累积回报cumulative_returns = [np.sum(ep_rewards[i:]) for i in locs]# 使用首次访问回报的平均值更新 Q 值for (s, a), cr in zip(sa_tuples, cumulative_returns):# 将首次访问回报添加到返回值列表中D["returns"][(s, a)].append(cr)# 更新 Q 值为返回值列表的平均值P["Q"][s, a] = np.mean(D["returns"][(s, a)])def _off_policy_update(self):"""Update `Q` using weighted importance sampling.Notes-----In importance sampling updates, we account for the fact that we areupdating a different policy from the one we used to generate behaviorby weighting the accumulated rewards by the ratio of the probability ofthe trajectory under the target policy versus its probability underthe behavior policies. This is known as the importance sampling weight.In weighted importance sampling, we scale the accumulated rewards for atrajectory by their importance sampling weight, then take the*weighted* average using the importance sampling weight. This weightedaverage then becomes the value for the trajectory.W = importance sampling weightG_t = total discounted reward from time t until episode endC_n = sum of importance weights for the first n rewardsThis algorithm converges to Q* in the limit."""P = self.parametersHS = self.episode_historyep_rewards = HS["rewards"]T = len(ep_rewards)G, W = 0.0, 1.0# 从最后一个时间步开始向前遍历for t in reversed(range(T)):s, a = HS["state_actions"][t]# 计算从时间步 t 开始到结束的总折扣奖励G = self.temporal_discount * G + ep_rewards[t]# 更新状态动作对 (s, a) 的重要性权重和P["C"][s, a] += W# 使用加权重要性采样更新 Q(s, a)P["Q"][s, a] += (W / P["C"][s, a]) * (G - P["Q"][s, a])# 将重要性采样比率乘以当前权重W *= self.target_policy(s, a) / self.behavior_policy(s, a)# 如果权重为零,则终止循环if W == 0.0:break# 定义一个方法,用于执行行为策略,生成训练过程中的动作def act(self, obs):r"""Execute the behavior policy--an :math:`\epsilon`-soft policy used togenerate actions during training.Parameters----------obs : int, float, or :py:class:`ndarray <numpy.ndarray>` as returned by ``env.step(action)``An observation from the environment.Returns-------action : int, float, or :py:class:`ndarray <numpy.ndarray>`An action sampled from the distribution over actions defined by theepsilon-soft policy.""" # noqa: E501# 将观察值转换为数字s = self._obs2num[obs]# 调用行为策略方法,返回动作return self.behavior_policy(s)# 运行一个单独的 episodedef run_episode(self, max_steps, render=False):"""Run the agent on a single episode.Parameters----------max_steps : intThe maximum number of steps to run an episode.render : boolWhether to render the episode during training.Returns-------reward : floatThe total reward on the episode.steps : floatThe number of steps taken on the episode."""# 获取派生变量D = self.derived_variables# 运行 episode,获取总奖励和步数total_rwd, n_steps = self._episode(max_steps, render)# 更新 episode 数量D["episode_num"] += 1# 返回总奖励和步数return total_rwd, n_stepsdef _episode(self, max_steps, render):"""Execute agent on an episode.Parameters----------max_steps : intThe maximum number of steps to run the episode.render : boolWhether to render the episode during training.Returns-------reward : floatThe total reward on the episode.steps : floatThe number of steps taken on the episode."""# 重置环境并获取初始观察值obs = self.env.reset()# 获取当前 episode 的历史记录HS = self.episode_history# 初始化总奖励和步数total_reward, n_steps = 0.0, 0# 循环执行每一步直到达到最大步数for i in range(max_steps):# 如果需要渲染,则显示环境if render:self.env.render()# 增加步数计数n_steps += 1# 根据当前观察值选择动作action = self.act(obs)# 将观察值和动作转换为数字s = self._obs2num[obs]a = self._action2num[action]# 存储 (状态, 动作) 元组HS["state_actions"].append((s, a))# 执行动作并获取奖励等信息obs, reward, done, info = self.env.step(action)# 记录奖励HS["rewards"].append(reward)total_reward += reward# 如果 episode 结束,则跳出循环if done:break# 返回总奖励和步数return total_reward, n_stepsdef update(self):"""Update the parameters of the model following the completion of anepisode. Flush the episode history after the update is complete."""# 获取超参数H = self.hyperparameters# 如果是离线策略更新,则调用离线策略更新方法if H["off_policy"]:self._off_policy_update()else:# 否则调用在线策略更新方法self._on_policy_update()# 清空 episode 历史记录self.flush_history()def greedy_policy(self, max_steps, render=True):"""Execute a greedy policy using the current agent parameters.Parameters----------max_steps : intThe maximum number of steps to run the episode.render : boolWhether to render the episode during execution.Returns-------total_reward : floatThe total reward on the episode.n_steps : floatThe total number of steps taken on the episode."""# 获取当前的 episode 历史记录H = self.episode_history# 重置环境并获取初始观察值obs = self.env.reset()# 初始化总奖励和步数total_reward, n_steps = 0.0, 0# 循环执行最大步数for i in range(max_steps):# 如果需要渲染环境,则进行渲染if render:self.env.render()# 增加步数计数n_steps += 1# 根据当前观察值执行贪婪策略选择动作action = self._greedy(obs)# 将观察值和动作转换为数字表示s = self._obs2num[obs]a = self._action2num[action]# 存储 (状态, 动作) 元组H["state_actions"].append((s, a))# 执行动作obs, reward, done, info = self.env.step(action)# 记录奖励H["rewards"].append(reward)total_reward += reward# 如果 episode 结束,则跳出循环if done:break# 返回总奖励和步数return total_reward, n_steps
# 定义一个 TemporalDifferenceAgent 类,继承自 AgentBase 类
class TemporalDifferenceAgent(AgentBase):# 初始化函数,接受环境、学习率、探索率、瓦片数、观测最大值、观测最小值、网格维度、是否离线策略、时间折扣等参数def __init__(self,env,lr=0.4,epsilon=0.1,n_tilings=8,obs_max=None,obs_min=None,grid_dims=[8, 8],off_policy=False,temporal_discount=0.99,):# 初始化参数函数def _init_params(self):# 获取环境信息E = self.env_info# 断言动作空间必须是离散的assert not E["continuous_actions"], "Action space must be discrete"obs_encoder = None# 如果观测空间是连续的if E["continuous_observations"]:# 对观测空间进行编码obs_encoder, _ = tile_state_space(self.env,self.env_info,self.n_tilings,state_action=False,obs_max=self.obs_max,obs_min=self.obs_min,grid_size=self.grid_dims,)# 创建观测空间到数字的字典self._create_2num_dicts(obs_encoder=obs_encoder)# 行为策略是随机的,epsilon-soft 策略self.behavior_policy = self.target_policy = self._epsilon_soft_policy# 如果是离线策略if self.off_policy:# 目标策略是确定性的,贪婪策略self.target_policy = self._greedy# 初始化 Q 函数self.parameters["Q"] = defaultdict(np.random.rand)# 初始化每个状态-动作对的回报对象self.derived_variables = {"episode_num": 0}# 超参数self.hyperparameters = {"agent": "TemporalDifferenceAgent","lr": self.lr,"obs_max": self.obs_max,"obs_min": self.obs_min,"epsilon": self.epsilon,"n_tilings": self.n_tilings,"grid_dims": self.grid_dims,"off_policy": self.off_policy,"temporal_discount": self.temporal_discount,}# 记录每一集的历史数据self.episode_history = {"state_actions": [], "rewards": []}def run_episode(self, max_steps, render=False):"""Run the agent on a single episode without updating the priority queueor performing backups.Parameters----------max_steps : intThe maximum number of steps to run an episoderender : boolWhether to render the episode during trainingReturns-------reward : floatThe total reward on the episode, averaged over the theta samples.steps : floatThe total number of steps taken on the episode, averaged over thetheta samples."""# 调用 _episode 方法运行一个单独的 episode,不更新优先级队列或执行备份return self._episode(max_steps, render, update=False)def train_episode(self, max_steps, render=False):"""Train the agent on a single episode.Parameters----------max_steps : intThe maximum number of steps to run an episode.render : boolWhether to render the episode during training.Returns-------reward : floatThe total reward on the episode.steps : floatThe number of steps taken on the episode."""# 获取派生变量D = self.derived_variables# 调用 _episode 方法训练一个单独的 episodetotal_rwd, n_steps = self._episode(max_steps, render, update=True)# 更新 episode_numD["episode_num"] += 1return total_rwd, n_steps# 定义一个方法,用于运行或训练智能体在一个 episode 上def _episode(self, max_steps, render, update=True):"""Run or train the agent on an episode.Parameters----------max_steps : intThe maximum number of steps to run the episode.render : boolWhether to render the episode during training.update : boolWhether to perform the Q function backups after each step. Defaultis True.Returns-------reward : floatThe total reward on the episode.steps : floatThe number of steps taken on the episode."""# 清空 episode 历史记录self.flush_history()# 重置环境并获取初始观察obs = self.env.reset()HS = self.episode_history# 根据当前观察选择动作action = self.act(obs)s = self._obs2num[obs]a = self._action2num[action]# 存储初始的 (状态, 动作) 元组HS["state_actions"].append((s, a))total_reward, n_steps = 0.0, 0for i in range(max_steps):if render:self.env.render()# 执行动作obs, reward, done, info = self.env.step(action)n_steps += 1# 记录奖励HS["rewards"].append(reward)total_reward += reward# 生成下一个状态和动作action = self.act(obs)s_ = self._obs2num[obs] if not done else Nonea_ = self._action2num[action]# 存储下一个 (状态, 动作) 元组HS["state_actions"].append((s_, a_))# 如果需要更新 Q 函数,则执行更新if update:self.update()# 如果 episode 结束,则跳出循环if done:break# 返回总奖励和步数return total_reward, n_stepsdef _greedy(self, s, a=None):"""A greedy behavior policy. Only used when off-policy is true.Parameters----------s : int, float, or tupleThe state number for the current observation, as returned by``self._obs2num[obs]``a : int, float, or tupleThe action number in the current state, as returned by``self._action2num[obs]``. If None, sample an action from theaction probabilities in state `s`, otherwise, return theprobability of action `a` under the greedy policy. Default is None.Returns-------If `a` is None:action : int, float, or :py:class:`ndarray <numpy.ndarray>` as returned by ``self._num2action``If `a` is None, returns an action sampled from the distributionover actions defined by the greedy policy.If `a` is not None:action_prob : float in range [0, 1]If `a` is not None, returns the probability of `a` under thegreedy policy.""" # noqa: E501# 获取参数和环境信息P, E = self.parameters, self.env_info# 计算动作空间的总数n_actions = np.prod(E["n_actions_per_dim"])# 找到在当前状态下使得 Q 值最大的动作a_star = np.argmax([P["Q"][(s, aa)] for aa in range(n_actions)])# 如果 a 为 None,则从贪婪策略定义的动作分布中随机选择一个动作if a is None:out = self._num2action[a_star]# 如果 a 不为 None,则返回在贪婪策略下动作 a 的概率else:out = 1 if a == a_star else 0return outdef _on_policy_update(self, s, a, r, s_, a_):"""Update the Q function using the expected SARSA on-policy TD(0) update:Q[s, a] <- Q[s, a] + lr * [r + temporal_discount * E[Q[s', a'] | s'] - Q[s, a]]whereE[ Q[s', a'] | s'] is the expected value of the Q function over alla_ given that we're in state s' under the current policyNB. the expected SARSA update can be used for both on- and off-policymethods. In an off-policy context, if the target policy is greedy andthe expectation is taken wrt. the target policy then the expected SARSAupdate is exactly Q-learning.Parameters----------s : int as returned by `self._obs2num`The id for the state/observation at timestep t-1a : int as returned by `self._action2num`The id for the action taken at timestep t-1r : floatThe reward after taking action `a` in state `s` at timestep t-1s_ : int as returned by `self._obs2num`The id for the state/observation at timestep ta_ : int as returned by `self._action2num`The id for the action taken at timestep t"""Q, E, pi = self.parameters["Q"], self.env_info, self.behavior_policy# TODO: this assumes that all actions are available in each staten_actions = np.prod(E["n_actions_per_dim"])# compute the expected value of Q(s', a') given that we are in state s'E_Q = np.sum([pi(s_, aa) * Q[(s_, aa)] for aa in range(n_actions)]) if s_ else 0# perform the expected SARSA TD(0) updateqsa = Q[(s, a)]Q[(s, a)] = qsa + self.lr * (r + self.temporal_discount * E_Q - qsa)def _off_policy_update(self, s, a, r, s_):"""Update the `Q` function using the TD(0) Q-learning update:Q[s, a] <- Q[s, a] + lr * (r + temporal_discount * max_a { Q[s', a] } - Q[s, a])Parameters----------s : int as returned by `self._obs2num`The id for the state/observation at timestep `t-1`a : int as returned by `self._action2num`The id for the action taken at timestep `t-1`r : floatThe reward after taking action `a` in state `s` at timestep `t-1`s_ : int as returned by `self._obs2num`The id for the state/observation at timestep `t`"""Q, E = self.parameters["Q"], self.env_infon_actions = np.prod(E["n_actions_per_dim"])qsa = Q[(s, a)]Qs_ = [Q[(s_, aa)] for aa in range(n_actions)] if s_ else [0]Q[(s, a)] = qsa + self.lr * (r + self.temporal_discount * np.max(Qs_) - qsa)def update(self):"""Update the parameters of the model online after each new state-action."""H, HS = self.hyperparameters, self.episode_history(s, a), r = HS["state_actions"][-2], HS["rewards"][-1]s_, a_ = HS["state_actions"][-1]if H["off_policy"]:# 如果是离线策略更新,则调用_off_policy_update函数self._off_policy_update(s, a, r, s_)else:# 如果是在线策略更新,则调用_on_policy_update函数self._on_policy_update(s, a, r, s_, a_)# 定义一个方法,执行行为策略--一个用于在训练期间生成动作的 :math:`\epsilon`-soft 策略def act(self, obs):r"""Execute the behavior policy--an :math:`\epsilon`-soft policy used togenerate actions during training.Parameters----------obs : int, float, or :py:class:`ndarray <numpy.ndarray>` as returned by ``env.step(action)``An observation from the environment.Returns-------action : int, float, or :py:class:`ndarray <numpy.ndarray>`An action sampled from the distribution over actions defined by theepsilon-soft policy.""" # noqa: E501# 将观察值转换为数字s = self._obs2num[obs]# 调用行为策略方法,返回动作return self.behavior_policy(s)# 定义一个方法,执行一个确定性贪婪策略,使用当前代理参数def greedy_policy(self, max_steps, render=True):"""Execute a deterministic greedy policy using the current agentparameters.Parameters----------max_steps : intThe maximum number of steps to run the episode.render : boolWhether to render the episode during execution.Returns-------total_reward : floatThe total reward on the episode.n_steps : floatThe total number of steps taken on the episode."""# 清空历史记录self.flush_history()# 获取环境的初始观察值H = self.episode_historyobs = self.env.reset()total_reward, n_steps = 0.0, 0# 循环执行最大步数for i in range(max_steps):# 如果需要渲染环境,则渲染if render:self.env.render()# 将观察值转换为数字s = self._obs2num[obs]# 使用贪婪策略选择动作action = self._greedy(s)# 将动作转换为数字a = self._action2num[action]# 存储 (状态, 动作) 元组H["state_actions"].append((s, a))# 执行动作obs, reward, done, info = self.env.step(action)n_steps += 1# 记录奖励H["rewards"].append(reward)total_reward += reward# 如果完成了一个 episode,则跳出循环if done:breakreturn total_reward, n_steps
# 定义一个名为DynaAgent的类,继承自AgentBase类
class DynaAgent(AgentBase):# 初始化方法,接受多个参数def __init__(self,env, # 环境对象lr=0.4, # 学习率,默认值为0.4epsilon=0.1, # ε-greedy策略中的ε值,默认为0.1n_tilings=8, # 瓦片编码中的瓦片数量,默认为8obs_max=None, # 观测值的最大值,默认为Noneobs_min=None, # 观测值的最小值,默认为Noneq_plus=False, # 是否使用Q+学习算法,默认为Falsegrid_dims=[8, 8], # 网格维度,默认为[8, 8]explore_weight=0.05, # 探索权重,默认为0.05temporal_discount=0.9, # 时间折扣因子,默认为0.9n_simulated_actions=50, # 模拟动作的数量,默认为50# 初始化参数def _init_params(self):# 获取环境信息E = self.env_info# 确保动作空间是离散的assert not E["continuous_actions"], "Action space must be discrete"# 初始化观测编码器obs_encoder = None# 如果观测是连续的if E["continuous_observations"]:# 对状态空间进行切片obs_encoder, _ = tile_state_space(self.env,self.env_info,self.n_tilings,state_action=False,obs_max=self.obs_max,obs_min=self.obs_min,grid_size=self.grid_dims,)# 创建状态编码器和动作编码器的字典self._create_2num_dicts(obs_encoder=obs_encoder)# 设置行为策略和目标策略为 epsilon-soft 策略self.behavior_policy = self.target_policy = self._epsilon_soft_policy# 初始化 Q 函数和模型self.parameters["Q"] = defaultdict(np.random.rand)self.parameters["model"] = EnvModel()# 初始化每个状态-动作对的返回对象self.derived_variables = {"episode_num": 0,"sweep_queue": {},"visited": set(),"steps_since_last_visit": defaultdict(lambda: 0),}# 如果使用 Q+ 算法if self.q_plus:self.derived_variables["steps_since_last_visit"] = defaultdict(np.random.rand,)# 设置超参数self.hyperparameters = {"agent": "DynaAgent","lr": self.lr,"q_plus": self.q_plus,"obs_max": self.obs_max,"obs_min": self.obs_min,"epsilon": self.epsilon,"n_tilings": self.n_tilings,"grid_dims": self.grid_dims,"explore_weight": self.explore_weight,"temporal_discount": self.temporal_discount,"n_simulated_actions": self.n_simulated_actions,}# 初始化每一集的历史记录self.episode_history = {"state_actions": [], "rewards": []}# 执行行为策略--一个用于在训练期间生成动作的ε-soft策略def act(self, obs):# 将环境返回的观测转换为数字形式s = self._obs2num[obs]# 从由ε-soft策略定义的动作分布中采样一个动作return self.behavior_policy(s)# 定义一个贪婪的行为策略函数def _greedy(self, s, a=None):"""A greedy behavior policy.Parameters----------s : int, float, or tupleThe state number for the current observation, as returned byself._obs2num[obs]a : int, float, or tupleThe action number in the current state, as returned byself._action2num[obs]. If None, sample an action from the actionprobabilities in state s, otherwise, return the probability ofaction `a` under the greedy policy. Default is None.Returns-------If `a` is None:action : int, float, or :py:class:`ndarray <numpy.ndarray>` as returned by :meth:`_num2action`If `a` is None, returns an action sampled from the distributionover actions defined by the greedy policy.If `a` is not None:action_prob : float in range [0, 1]If `a` is not None, returns the probability of `a` under thegreedy policy.""" # noqa: E501# 获取环境信息和 Q 值E, Q = self.env_info, self.parameters["Q"]# 计算动作空间的总数n_actions = np.prod(E["n_actions_per_dim"])# 找到在当前状态下使 Q 值最大的动作a_star = np.argmax([Q[(s, aa)] for aa in range(n_actions)])# 如果 a 为 None,则从贪婪策略定义的动作分布中随机选择一个动作if a is None:out = self._num2action[a_star]# 如果 a 不为 None,则返回 a 在贪婪策略下的概率else:out = 1 if a == a_star else 0return outdef update(self):"""Update the priority queue with the most recent (state, action) pair andperform random-sample one-step tabular Q-planning.Notes-----The planning algorithm uses a priority queue to retrieve thestate-action pairs from the agent's history which will result in thelargest change to its `Q`-value if backed up. When the first pair inthe queue is backed up, the effect on each of its predecessor pairs iscomputed. If the predecessor's priority is greater than a smallthreshold the pair is added to the queue and the process is repeateduntil either the queue is empty or we exceed `n_simulated_actions`updates."""# 获取最近的 (state, action) 对s, a = self.episode_history["state_actions"][-1]# 更新优先级队列self._update_queue(s, a)# 模拟行为self._simulate_behavior()def _update_queue(self, s, a):"""Update the priority queue by calculating the priority for (s, a) andinserting it into the queue if it exceeds a fixed (small) threshold.Parameters----------s : int as returned by `self._obs2num`The id for the state/observationa : int as returned by `self._action2num`The id for the action taken from state `s`"""# 获取派生变量中的优先级队列sweep_queue = self.derived_variables["sweep_queue"]# TODO: what's a good threshold here?# 计算 (s, a) 的优先级priority = self._calc_priority(s, a)# 如果优先级大于等于 0.001,则插入到优先级队列中if priority >= 0.001:if (s, a) in sweep_queue:sweep_queue[(s, a)] = max(priority, sweep_queue[(s, a)])else:sweep_queue[(s, a)] = prioritydef _calc_priority(self, s, a):"""计算状态动作对 (s, a) 的“优先级”。优先级 P 定义为:P = sum_{s_} p(s_) * abs(r + temporal_discount * max_a {Q[s_, a]} - Q[s, a])这对应于 TD(0) Q-learning 对 (s, a) 的绝对值大小的备份。Parameters----------s : int as returned by `self._obs2num`状态/观察的 ida : int as returned by `self._action2num`从状态 `s` 中采取的动作的 idReturns-------priority : float(s, a) 的全备份 TD(0) Q-learning 更新的绝对值大小"""priority = 0.0E = self.env_infoQ = self.parameters["Q"]env_model = self.parameters["model"]n_actions = np.prod(E["n_actions_per_dim"])outcome_probs = env_model.outcome_probs(s, a)for (r, s_), p_rs_ in outcome_probs:max_q = np.max([Q[(s_, aa)] for aa in range(n_actions)])P = p_rs_ * (r + self.temporal_discount * max_q - Q[(s, a)])priority += np.abs(P)return prioritydef _simulate_behavior(self):"""Perform random-sample one-step tabular Q-planning with prioritizedsweeping.Notes-----This approach uses a priority queue to retrieve the state-action pairsfrom the agent's history with largest change to their Q-values ifbacked up. When the first pair in the queue is backed up, the effect oneach of its predecessor pairs is computed. If the predecessor'spriority is greater than a small threshold the pair is added to thequeue and the process is repeated until either the queue is empty or wehave exceeded a `n_simulated_actions` updates."""# 获取环境模型和优先级队列env_model = self.parameters["model"]sweep_queue = self.derived_variables["sweep_queue"]# 进行一定次数的模拟行为for _ in range(self.n_simulated_actions):# 如果队列为空,则结束模拟if len(sweep_queue) == 0:break# 从队列中选择具有最大更新(优先级)的(s, a)对sq_items = list(sweep_queue.items())(s_sim, a_sim), _ = sorted(sq_items, key=lambda x: x[1], reverse=True)[0]# 从队列中删除条目del sweep_queue[(s_sim, a_sim)]# 使用完全备份版本的TD(0) Q-learning更新为(s_sim, a_sim)更新Q函数self._update(s_sim, a_sim)# 获取导致s_sim的所有(_s, _a)对(即s_sim的前导状态)pairs = env_model.state_action_pairs_leading_to_outcome(s_sim)# 如果前导状态的优先级超过阈值,则将其添加到队列中for (_s, _a) in pairs:self._update_queue(_s, _a)def _update(self, s, a):"""Update Q using a full-backup version of the TD(0) Q-learning update:Q(s, a) = Q(s, a) + lr *sum_{r, s'} [p(r, s' | s, a) * (r + gamma * max_a { Q(s', a) } - Q(s, a))]Parameters----------s : int as returned by ``self._obs2num``The id for the state/observationa : int as returned by ``self._action2num``The id for the action taken from state `s`"""# 初始化更新值为0update = 0.0# 获取环境模型、环境信息、派生变量和Q值env_model = self.parameters["model"]E, D, Q = self.env_info, self.derived_variables, self.parameters["Q"]# 计算动作空间的大小n_actions = np.prod(E["n_actions_per_dim"])# 从模型中采样奖励outcome_probs = env_model.outcome_probs(s, a)for (r, s_), p_rs_ in outcome_probs:# 如果启用Q+算法,根据上次访问时间给奖励加上一个“奖励”if self.q_plus:r += self.explore_weight * np.sqrt(D["steps_since_last_visit"][(s, a)])# 计算下一个状态的最大Q值max_q = np.max([Q[(s_, a_)] for a_ in range(n_actions)])# 更新值根据TD(0) Q-learning更新公式计算update += p_rs_ * (r + self.temporal_discount * max_q - Q[(s, a)])# 更新Q值Q[(s, a)] += self.lr * updatedef run_episode(self, max_steps, render=False):"""Run the agent on a single episode without performing `Q`-functionbackups.Parameters----------max_steps : intThe maximum number of steps to run an episode.render : boolWhether to render the episode during training.Returns-------reward : floatThe total reward on the episode.steps : floatThe number of steps taken on the episode."""# 运行一个不执行Q函数备份的单个episodereturn self._episode(max_steps, render, update=False)def train_episode(self, max_steps, render=False):"""Train the agent on a single episode.Parameters----------max_steps : intThe maximum number of steps to run an episode.render : boolWhether to render the episode during training.Returns-------reward : floatThe total reward on the episode.steps : floatThe number of steps taken on the episode."""# 获取派生变量D = self.derived_variables# 在一个 episode 上运行 _episode 方法,返回总奖励和步数total_rwd, n_steps = self._episode(max_steps, render, update=True)# 增加 episode_num 计数D["episode_num"] += 1# 返回总奖励和步数return total_rwd, n_stepsdef greedy_policy(self, max_steps, render=True):"""Execute a deterministic greedy policy using the current agentparameters.Parameters----------max_steps : intThe maximum number of steps to run the episode.render : boolWhether to render the episode during execution.Returns-------total_reward : floatThe total reward on the episode.n_steps : floatThe total number of steps taken on the episode."""# 清空历史记录self.flush_history()# 获取 episode_historyH = self.episode_history# 重置环境并获取初始观察obs = self.env.reset()total_reward, n_steps = 0.0, 0for i in range(max_steps):# 如果需要渲染,显示环境if render:self.env.render()# 将观察转换为数字s = self._obs2num[obs]# 使用贪婪策略选择动作action = self._greedy(s)# 将动作转换为数字a = self._action2num[action]# 存储 (状态, 动作) 元组H["state_actions"].append((s, a))# 执行动作obs, reward, done, info = self.env.step(action)n_steps += 1# 记录奖励H["rewards"].append(reward)total_reward += reward# 如果 episode 结束,跳出循环if done:break# 返回总奖励和步数return total_reward, n_steps
RL Models
The agents.py
module implements a number of standard reinforcement learning (RL) agents that
can be run on OpenAI gym environments.
-
Monte Carlo Methods
- First-visit Monte Carlo updates (on-policy)
- Incremental weighted importance sampling (off-policy)
- Cross-entropy method (Mannor, Rubinstein, & Gat, 2003)
-
Temporal-Difference Methods
- SARSA (on-policy) (Rummery & Niranjan, 1994)
- Q-learning (off-policy) (Watkins, 1989)
-
Model-Based Methods
- Dyna-Q/Dyna-Q+ with prioritized sweeping (Sutton, 1990; Moore & Atkeson, 1993)
Plots
numpy-ml\numpy_ml\rl_models\rl_utils.py
# 导入警告模块
import warnings
# 导入 product 函数和 defaultdict 类
from itertools import product
from collections import defaultdict# 导入 numpy 库
import numpy as np# 导入自定义的 DependencyWarning 类
from numpy_ml.utils.testing import DependencyWarning
# 导入 tiles 和 IHT 函数
from numpy_ml.rl_models.tiles.tiles3 import tiles, IHT# 初始化 NO_PD 变量为 False
NO_PD = False
# 尝试导入 pandas 库,如果导入失败则将 NO_PD 设置为 True
try:import pandas as pd
except ModuleNotFoundError:NO_PD = True# 尝试导入 gym 库,如果导入失败则发出警告
try:import gym
except ModuleNotFoundError:fstr = ("Agents in `numpy_ml.rl_models` use the OpenAI gym for training. ""To install the gym environments, run `pip install gym`. For more"" information, see https://github.com/openai/gym.")warnings.warn(fstr, DependencyWarning)# 定义一个简单的环境模型类
class EnvModel(object):"""A simple tabular environment model that maintains the counts of eachreward-outcome pair given the state and action that preceded them. Themodel can be queried with>>> M = EnvModel()>>> M[(state, action, reward, next_state)] += 1>>> M[(state, action, reward, next_state)]1>>> M.state_action_pairs()[(state, action)]>>> M.outcome_probs(state, action)[(next_state, 1)]"""# 初始化方法def __init__(self):super(EnvModel, self).__init__()# 使用 defaultdict 创建一个嵌套字典作为环境模型self._model = defaultdict(lambda: defaultdict(lambda: 0))# 设置方法,设置环境模型中的值def __setitem__(self, key, value):"""Set self[key] to value"""s, a, r, s_ = keyself._model[(s, a)][(r, s_)] = value# 获取方法,获取环境模型中的值def __getitem__(self, key):"""Return the value associated with key"""s, a, r, s_ = keyreturn self._model[(s, a)][(r, s_)]# 包含方法,判断环境模型是否包含某个键def __contains__(self, key):"""True if EnvModel contains `key`, else False"""s, a, r, s_ = key# 判断状态-动作对和奖励-下一个状态对是否在环境模型中p1 = (s, a) in self.state_action_pairs()p2 = (r, s_) in self.reward_outcome_pairs()return p1 and p2# 返回环境模型中所有状态和动作对def state_action_pairs(self):"""Return all (state, action) pairs in the environment model"""return list(self._model.keys())# 返回在状态`s`中采取动作`a`时关联的所有奖励和下一个状态对def reward_outcome_pairs(self, s, a):"""Return all (reward, next_state) pairs associated with taking action `a`in state `s`."""return list(self._model[(s, a)].keys())# 返回在状态`s`中采取动作`a`后每个可能结果状态的环境模型概率def outcome_probs(self, s, a):"""Return the probability under the environment model of each outcomestate after taking action `a` in state `s`.Parameters----------s : int as returned by ``self._obs2num``The id for the state/observation.a : int as returned by ``self._action2num``The id for the action taken from state `s`.Returns-------outcome_probs : list of (state, prob) tuplesA list of each possible outcome and its associated probabilityunder the model."""items = list(self._model[(s, a)].items())total_count = np.sum([c for (_, c) in items])outcome_probs = [c / total_count for (_, c) in items]outcomes = [p for (p, _) in items]return list(zip(outcomes, outcome_probs))# 返回所有具有在当前模型下产生`outcome`的非零概率的状态和动作对def state_action_pairs_leading_to_outcome(self, outcome):"""Return all (state, action) pairs that have a nonzero probability ofproducing `outcome` under the current model.Parameters----------outcome : intThe outcome state.Returns-------pairs : list of (state, action) tuplesA list of all (state, action) pairs with a nonzero probability ofproducing `outcome` under the model."""pairs = []for sa in self.state_action_pairs():outcomes = [o for (r, o) in self.reward_outcome_pairs(*sa)]if outcome in outcomes:pairs.append(sa)return pairs
# 定义一个函数,用于将环境生成的连续观测编码为状态空间的一组重叠瓦片
def tile_state_space(env, # 环境对象,openAI环境env_stats, # 环境统计信息n_tilings, # 使用的重叠瓦片数量,应为2的幂,决定离散化瓦片编码状态向量的维度obs_max=None, # 观测空间的最大值,用于计算网格宽度,默认为None,使用env.observation_space.highobs_min=None, # 观测空间的最小值,用于计算网格宽度,默认为None,使用env.observation_space.lowstate_action=False, # 是否使用瓦片编码来编码状态-动作值(True)或仅状态值(False),默认为Falsegrid_size=(4, 4), # 瓦片的粗糙度列表,每个瓦片由一个4x4的网格组成,默认为[4, 4]
):"""Return a function to encode the continous observations generated by `env`in terms of a collection of `n_tilings` overlapping tilings (each withdimension `grid_size`) of the state space.Arguments---------env : ``gym.wrappers.time_limit.TimeLimit`` instanceAn openAI environment.n_tilings : intThe number of overlapping tilings to use. Should be a power of 2. Thisdetermines the dimension of the discretized tile-encoded state vector.obs_max : float or np.ndarrayThe value to treat as the max value of the observation space whencalculating the grid widths. If None, use``env.observation_space.high``. Default is None.obs_min : float or np.ndarrayThe value to treat as the min value of the observation space whencalculating the grid widths. If None, use``env.observation_space.low``. Default is None.state_action : boolWhether to use tile coding to encode state-action values (True) or juststate values (False). Default is False.grid_size : list of length 2A list of ints representing the coarseness of the tilings. E.g., a`grid_size` of [4, 4] would mean each tiling consisted of a 4x4 tilegrid. Default is [4, 4].Returns-------encode_obs_as_tile : functionA function which takes as input continous observation vector andreturns a set of the indices of the active tiles in the tile codedobservation space.n_states : intAn integer reflecting the total number of unique states possible underthis tile coding regimen."""# 如果obs_max为None,则将env.observation_space.high转换为数值,否则保持obs_max不变obs_max = np.nan_to_num(env.observation_space.high) if obs_max is None else obs_max# 如果obs_min为None,则将env.observation_space.low转换为数值,否则保持obs_min不变obs_min = np.nan_to_num(env.observation_space.low) if obs_min is None else obs_min# 如果状态动作存在if state_action:# 如果环境统计中包含元组动作if env_stats["tuple_action"]:# 计算每个动作空间的数量n = [space.n - 1.0 for space in env.action_spaces.spaces]else:# 获取环境动作空间的数量n = [env.action_space.n]# 更新观测最大值和最小值obs_max = np.concatenate([obs_max, n])obs_min = np.concatenate([obs_min, np.zeros_like(n)])# 计算观测范围obs_range = obs_max - obs_min# 计算缩放比例scale = 1.0 / obs_range# 定义缩放观测向量的函数scale_obs = lambda obs: obs * scale # noqa: E731# 计算总瓦片数和总状态数n_tiles = np.prod(grid_size) * n_tilingsn_states = np.prod([n_tiles - i for i in range(n_tilings)])# 创建指示器哈希表iht = IHT(16384)# 定义将观测编码为瓦片的函数def encode_obs_as_tile(obs):# 缩放观测向量obs = scale_obs(obs)return tuple(tiles(iht, n_tilings, obs))# 返回编码观测为瓦片的函数和总状态数return encode_obs_as_tile, n_states
# 返回所有有效的 OpenAI ``gym`` 环境的 ID 列表
def get_gym_environs():return [e.id for e in gym.envs.registry.all()]# 返回一个包含环境 ID 的 pandas DataFrame
def get_gym_stats():df = []# 遍历所有 gym 环境for e in gym.envs.registry.all():# 打印环境 IDprint(e.id)# 获取环境统计信息并添加到 DataFrame 中df.append(env_stats(gym.make(e.id)))cols = ["id","continuous_actions","continuous_observations","action_dim",# "action_ids","deterministic","multidim_actions","multidim_observations","n_actions_per_dim","n_obs_per_dim","obs_dim",# "obs_ids","seed","tuple_actions","tuple_observations",]# 如果没有安装 pandas,则返回列表,否则返回 DataFramereturn df if NO_PD else pd.DataFrame(df)[cols]# 检查环境的动作和观察空间是否为 ``gym.spaces.Tuple`` 或 ``gym.spaces.Dict``
def is_tuple(env):tuple_space, dict_space = gym.spaces.Tuple, gym.spaces.dict.Dict# 检查动作空间是否为 Tuple 或 Dicttuple_action = isinstance(env.action_space, (tuple_space, dict_space))# 检查观察空间是否为 Tuple 或 Dicttuple_obs = isinstance(env.observation_space, (tuple_space, dict_space))return tuple_action, tuple_obs# 检查环境的动作和观察空间是否为多维空间或 ``Tuple`` 空间
def is_multidimensional(env):# 多维空间是指动作/观察空间中有多个元素的空间,包括 ``Tuple`` 空间includes single action/observation spaces with several dimensions.Parameters----------env : ``gym.wrappers`` or ``gym.envs`` instanceThe environment to evaluate.Returns-------md_action : boolWhether the `env`'s action space is multidimensional.md_obs : boolWhether the `env`'s observation space is multidimensional.tuple_action : boolWhether the `env`'s action space is a ``Tuple`` instance.tuple_obs : boolWhether the `env`'s observation space is a ``Tuple`` instance."""# 初始化变量,假设环境的动作空间和观测空间都是多维的md_action, md_obs = True, True# 检查环境的动作空间和观测空间是否为元组类型tuple_action, tuple_obs = is_tuple(env)# 如果动作空间不是元组类型if not tuple_action:# 从动作空间中随机采样一个动作act = env.action_space.sample()# 判断采样的动作是否为列表、元组或者 NumPy 数组,并且长度大于1md_action = isinstance(act, (list, tuple, np.ndarray)) and len(act) > 1# 如果观测空间不是元组类型if not tuple_obs:# 获取观测空间对象OS = env.observation_space# 如果观测空间对象有 'low' 属性,则获取 'low' 属性值,否则随机采样一个观测obs = OS.low if "low" in dir(OS) else OS.sample() # sample causes problems# 判断采样的观测是否为列表、元组或者 NumPy 数组,并且长度大于1md_obs = isinstance(obs, (list, tuple, np.ndarray)) and len(obs) > 1# 返回动作空间是否多维、观测空间是否多维、动作空间是否为元组、观测空间是否为元组的结果return md_action, md_obs, tuple_action, tuple_obs
# 检查环境的观测和动作空间是否连续
def is_continuous(env, tuple_action, tuple_obs):# 导入 gym 库中的相关模块Continuous = gym.spaces.box.Box# 如果观测空间是元组类型if tuple_obs:# 获取环境的观测空间spaces = env.observation_space.spaces# 检查所有子空间是否为连续空间cont_obs = all(isinstance(s, Continuous) for s in spaces)else:# 检查观测空间是否为连续空间cont_obs = isinstance(env.observation_space, Continuous)# 如果动作空间是元组类型if tuple_action:# 获取环境的动作空间spaces = env.action_space.spaces# 检查所有子空间是否为连续空间cont_action = all(isinstance(s, Continuous) for s in spaces)else:# 检查动作空间是否为连续空间cont_action = isinstance(env.action_space, Continuous)# 返回动作空间是否连续和观测空间是否连续的布尔值return cont_action, cont_obs# 获取关于环境动作空间的信息
def action_stats(env, md_action, cont_action):# 参数 md_action 表示动作空间是否为多维的# 参数 cont_action 表示动作空间是否为连续的# 返回值 n_actions_per_dim 表示每个维度的动作空间可能的动作数量# 返回值 action_ids 表示空间内所有有效动作的列表,如果 cont_action 为 True,则为 None# 返回值 action_dim 表示单个动作的维度数量# 如果需要考虑动作,则初始化动作维度为1,动作ID为空,每个维度的动作数量为无穷大if cont_action:action_dim = 1action_ids = Nonen_actions_per_dim = [np.inf]# 如果需要考虑多维动作,则获取环境中动作空间的维度if md_action:action_dim = env.action_space.shape[0]n_actions_per_dim = [np.inf for _ in range(action_dim)]# 如果不需要考虑动作else:# 如果需要考虑多维动作if md_action:# 获取每个维度的动作数量,如果动作空间有属性"n"则获取其值,否则为无穷大n_actions_per_dim = [space.n if hasattr(space, "n") else np.inffor space in env.action_space.spaces]# 如果动作数量不为无穷大,则生成动作ID列表action_ids = (Noneif np.inf in n_actions_per_dimelse list(product(*[range(i) for i in n_actions_per_dim])))# 动作维度为动作数量列表的长度action_dim = len(n_actions_per_dim)# 如果不需要考虑多维动作else:# 初始化动作维度为1,每个维度的动作数量为环境中动作空间的数量,生成动作ID列表action_dim = 1n_actions_per_dim = [env.action_space.n]action_ids = list(range(n_actions_per_dim[0]))# 返回每个维度的动作数量列表,动作ID列表,动作维度return n_actions_per_dim, action_ids, action_dim
# 获取环境的观测空间信息
def obs_stats(env, md_obs, cont_obs):"""Get information on the observation space for `env`.Parameters----------env : ``gym.wrappers`` or ``gym.envs`` instanceThe environment to evaluate.md_obs : boolWhether the `env`'s action space is multidimensional.cont_obs : boolWhether the `env`'s observation space is multidimensional.Returns-------n_obs_per_dim : list of length (obs_dim,)The number of possible observation classes for each dimension of theobservation space.obs_ids : list or NoneA list of all valid observations within the space. If `cont_obs` isTrue, this value will be None.obs_dim : int or NoneThe number of dimensions in a single observation."""# 如果观测空间是连续的if cont_obs:# 观测空间的观测值列表设为 Noneobs_ids = None# 观测空间的维度设为观测空间的第一个维度obs_dim = env.observation_space.shape[0]# 每个维度的可能观测类别数设为无穷大n_obs_per_dim = [np.inf for _ in range(obs_dim)]else:# 如果观测空间不是连续的if md_obs:# 对于每个子空间,获取可能的观测类别数n_obs_per_dim = [space.n if hasattr(space, "n") else np.inffor space in env.observation_space.spaces]# 如果观测类别数中包含无穷大,则观测值列表设为 None,否则生成所有可能的观测值组合obs_ids = (Noneif np.inf in n_obs_per_dimelse list(product(*[range(i) for i in n_obs_per_dim])))# 观测空间的维度为子空间的数量obs_dim = len(n_obs_per_dim)else:# 如果观测空间是单维度的obs_dim = 1# 观测空间的可能观测类别数为观测空间的类别数n_obs_per_dim = [env.observation_space.n]# 观测值列表为所有可能的观测值obs_ids = list(range(n_obs_per_dim[0])# 返回观测空间信息return n_obs_per_dim, obs_ids, obs_dim# 计算当前环境的统计信息
def env_stats(env):"""Compute statistics for the current environment.Parameters----------env : ``gym.wrappers`` or ``gym.envs`` instanceThe environment to evaluate.Returns-------env_info : dictA dictionary containing information about the action and observationspaces of `env`."""# 检查环境是否是多维度的,获取动作和观测空间的信息md_action, md_obs, tuple_action, tuple_obs = is_multidimensional(env)# 检查环境是否具有连续动作和连续观测cont_action, cont_obs = is_continuous(env, tuple_action, tuple_obs)# 获取动作的统计信息,包括每个维度的动作数量、动作的 ID 和动作的维度n_actions_per_dim, action_ids, action_dim = action_stats(env, md_action, cont_action,)# 获取观测的统计信息,包括每个维度的观测数量、观测的 ID 和观测的维度n_obs_per_dim, obs_ids, obs_dim = obs_stats(env, md_obs, cont_obs)# 构建环境信息字典,包括环境的 ID、种子、是否确定性环境、动作和观测的类型、维度等信息env_info = {"id": env.spec.id,"seed": env.spec.seed if "seed" in dir(env.spec) else None,"deterministic": bool(~env.spec.nondeterministic),"tuple_actions": tuple_action,"tuple_observations": tuple_obs,"multidim_actions": md_action,"multidim_observations": md_obs,"continuous_actions": cont_action,"continuous_observations": cont_obs,"n_actions_per_dim": n_actions_per_dim,"action_dim": action_dim,"n_obs_per_dim": n_obs_per_dim,"obs_dim": obs_dim,"action_ids": action_ids,"obs_ids": obs_ids,}# 返回环境信息字典return env_info
numpy-ml\numpy_ml\rl_models\tiles\tiles3.py
"""
Tile Coding Software version 3.0beta
by Rich Sutton
based on a program created by Steph Schaeffer and others
External documentation and recommendations on the use of this code is available in the
reinforcement learning textbook by Sutton and Barto, and on the web.
These need to be understood before this code is.This software is for Python 3 or more.This is an implementation of grid-style tile codings, based originally on
the UNH CMAC code (see http://www.ece.unh.edu/robots/cmac.htm), but by now highly changed.
Here we provide a function, "tiles", that maps floating and integer
variables to a list of tiles, and a second function "tiles-wrap" that does the same while
wrapping some floats to provided widths (the lower wrap value is always 0).The float variables will be gridded at unit intervals, so generalization
will be by approximately 1 in each direction, and any scaling will have
to be done externally before calling tiles.Num-tilings should be a power of 2, e.g., 16. To make the offsetting work properly, it should
also be greater than or equal to four times the number of floats.The first argument is either an index hash table of a given size (created by (make-iht size)),
an integer "size" (range of the indices from 0), or nil (for testing, indicating that the tile
coordinates are to be returned without being converted to indices).
"""# 导入 math 模块中的 floor 函数和 itertools 模块中的 zip_longest 函数
from math import floor
from itertools import zip_longest# 将 basehash 函数赋值给 basehash 变量
basehash = hash# 定义 IHT 类
class IHT:"Structure to handle collisions"# 初始化方法,接受一个参数 sizevaldef __init__(self, sizeval):# 设置实例变量 size 为传入的 sizevalself.size = sizeval# 初始化 overfullCount 为 0self.overfullCount = 0# 初始化 dictionary 为空字典self.dictionary = {}# 定义 __str__ 方法,用于对象打印时返回字符串def __str__(self):# 返回包含对象信息的字符串return ("Collision table:"+ " size:"+ str(self.size)+ " overfullCount:"+ str(self.overfullCount)+ " dictionary:"+ str(len(self.dictionary))+ " items")# 返回字典中键值对的数量def count(self):return len(self.dictionary)# 检查字典是否已满def fullp(self):return len(self.dictionary) >= self.size# 获取对象在字典中的索引,如果对象不存在且只读模式,则返回Nonedef getindex(self, obj, readonly=False):# 获取字典引用d = self.dictionary# 如果对象在字典中存在,则返回其索引if obj in d:return d[obj]# 如果对象不存在且为只读模式,则返回Noneelif readonly:return None# 获取字典大小和当前键值对数量size = self.sizecount = self.count()# 如果键值对数量大于等于字典大小if count >= size:# 如果超出计数为0,则打印信息if self.overfullCount == 0:print("IHT full, starting to allow collisions")# 增加超出计数self.overfullCount += 1# 返回对象的哈希值对字典大小取模作为索引return basehash(obj) % self.sizeelse:# 将对象添加到字典中,并返回其索引d[obj] = countreturn count
# 根据输入的坐标、哈希表或大小、只读标志,返回哈希索引
def hashcoords(coordinates, m, readonly=False):# 如果哈希表类型为IHT,则调用getindex方法获取索引if type(m) == IHT:return m.getindex(tuple(coordinates), readonly)# 如果哈希表类型为整数,则对坐标进行哈希运算并取模if type(m) == int:return basehash(tuple(coordinates)) % m# 如果哈希表为None,则直接返回坐标if m == None:return coordinates# 返回num-tilings个瓦片索引,对应于浮点数和整数
def tiles(ihtORsize, numtilings, floats, ints=[], readonly=False):"""returns num-tilings tile indices corresponding to the floats and ints"""# 将浮点数乘以numtilings并向下取整qfloats = [floor(f * numtilings) for f in floats]Tiles = []for tiling in range(numtilings):tilingX2 = tiling * 2coords = [tiling]b = tilingfor q in qfloats:coords.append((q + b) // numtilings)b += tilingX2coords.extend(ints)Tiles.append(hashcoords(coords, ihtORsize, readonly))return Tiles# 返回num-tilings个瓦片索引,对应于浮点数和整数,其中一些浮点数进行了包装
def tileswrap(ihtORsize, numtilings, floats, wrapwidths, ints=[], readonly=False):"""returns num-tilings tile indices corresponding to the floats and ints,wrapping some floats"""qfloats = [floor(f * numtilings) for f in floats]Tiles = []for tiling in range(numtilings):tilingX2 = tiling * 2coords = [tiling]b = tilingfor q, width in zip_longest(qfloats, wrapwidths):c = (q + b % numtilings) // numtilingscoords.append(c % width if width else c)b += tilingX2coords.extend(ints)Tiles.append(hashcoords(coords, ihtORsize, readonly))return Tiles
numpy-ml\numpy_ml\rl_models\tiles\__init__.py
# 从当前目录中导入 tiles3 模块
from . import tiles3
numpy-ml\numpy_ml\rl_models\trainer.py
from time import time
import numpy as np# 定义一个 Trainer 类,用于方便地进行 agent 的训练和评估
class Trainer(object):def __init__(self, agent, env):"""An object to facilitate agent training and evaluation.Parameters----------agent : :class:`AgentBase` instanceThe agent to train.env : ``gym.wrappers`` or ``gym.envs`` instanceThe environment to run the agent on."""# 初始化 Trainer 对象,设置 agent 和 env 属性self.env = envself.agent = agent# 初始化 rewards 字典,用于存储训练过程中的奖励和相关信息self.rewards = {"total": [], "smooth_total": [], "n_steps": [], "duration": []}def _train_episode(self, max_steps, render_every=None):# 记录当前时间t0 = time()if "train_episode" in dir(self.agent):# 如果 agent 中有 train_episode 方法,则在线训练更新reward, n_steps = self.agent.train_episode(max_steps)else:# 如果 agent 中没有 train_episode 方法,则离线训练更新reward, n_steps = self.agent.run_episode(max_steps)# 更新 agentself.agent.update()# 计算训练时长duration = time() - t0return reward, duration, n_stepsdef train(self,n_episodes,max_steps,seed=None,plot=True,verbose=True,render_every=None,smooth_factor=0.05,def plot_rewards(self, rwd_greedy):"""Plot the cumulative reward per episode as a function of episode number.Notes-----Saves plot to the file ``./img/<agent>-<env>.png``Parameters----------rwd_greedy : floatThe cumulative reward earned with a final execution of a greedytarget policy."""try:import matplotlib.pyplot as pltimport seaborn as sns# 设置 seaborn 库的样式为白色sns.set_style("white")# 设置 seaborn 库的上下文为 notebook,字体大小为 1sns.set_context("notebook", font_scale=1)except:fstr = "Error importing `matplotlib` and `seaborn` -- plotting functionality is disabled"# 如果导入 matplotlib 和 seaborn 失败,则抛出 ImportError 异常raise ImportError(fstr)# 获取累积奖励数据R = self.rewards# 创建图形和轴对象fig, ax = plt.subplots()# 创建 x 轴数据,表示每一轮的序号x = np.arange(len(R["total"]))# 创建 y 轴数据,表示平滑后的累积奖励y = R["smooth_total"]# 创建 y_raw 轴数据,表示原始的累积奖励y_raw = R["total"]# 绘制平滑后的累积奖励曲线ax.plot(x, y, label="smoothed")# 绘制原始的累积奖励曲线,透明度为 0.5ax.plot(x, y_raw, alpha=0.5, label="raw")# 添加一条虚线,表示最终贪婪策略的累积奖励ax.axhline(y=rwd_greedy, xmin=min(x), xmax=max(x), ls=":", label="final greedy")# 添加图例ax.legend()# 移除图形的上边界和右边界sns.despine()# 获取环境名称和智能体名称env = self.agent.env_info["id"]agent = self.agent.hyperparameters["agent"]# 设置 x 轴标签为 "Episode"ax.set_xlabel("Episode")# 设置 y 轴标签为 "Cumulative reward"ax.set_ylabel("Cumulative reward")# 设置图形标题为智能体名称和环境名称的组合ax.set_title("{} on '{}'".format(agent, env))# 保存图形到文件 img/<agent>-<env>.pngplt.savefig("img/{}-{}.png".format(agent, env))# 关闭所有图形plt.close("all")
numpy-ml\numpy_ml\rl_models\__init__.py
# 从当前目录中导入 rl_utils 模块
from . import rl_utils
# 从当前目录中导入 agents 模块
from . import agents
# 从当前目录中导入 trainer 模块
from . import trainer
# 从当前目录中导入 tiles 模块
from . import tiles
numpy-ml\numpy_ml\tests\nn_torch_models.py
# 禁用 flake8 检查
# 导入 PyTorch 库
import torch
import torch.nn as nn
import torch.nn.functional as F# 导入 TensorFlow 库
import tensorflow as tf# 导入 NumPy 库
import numpy as np#######################################################################
# 用于测试自定义层的黄金标准实现 #
# (需要 PyTorch) #
######################################################################## 将输入转换为 PyTorch 变量
def torchify(var, requires_grad=True):return torch.autograd.Variable(torch.FloatTensor(var), requires_grad=requires_grad)# 生成 PyTorch 梯度计算器
def torch_gradient_generator(fn, **kwargs):def get_grad(z):z1 = torch.autograd.Variable(torch.FloatTensor(z), requires_grad=True)z2 = fn(z1, **kwargs).sum()z2.backward()grad = z1.grad.numpy()return gradreturn get_grad# 计算交叉熵损失函数的梯度
def torch_xe_grad(y, z):z = torch.autograd.Variable(torch.FloatTensor(z), requires_grad=True)y = torch.LongTensor(y.argmax(axis=1))loss = F.cross_entropy(z, y, reduction="sum")loss.backward()grad = z.grad.numpy()return grad# 计算均方误差损失函数的梯度
def torch_mse_grad(y, z, act_fn):y = torch.FloatTensor(y)z = torch.autograd.Variable(torch.FloatTensor(z), requires_grad=True)y_pred = act_fn(z)loss = F.mse_loss(y_pred, y, reduction="sum") # size_average=False).sum()loss.backward()grad = z.grad.numpy()return grad# PyTorch VAE 损失函数类
class TorchVAELoss(nn.Module):def __init__(self):super(TorchVAELoss, self).__init__()# 从输入数据中提取梯度信息def extract_grads(self, X, X_recon, t_mean, t_log_var):# 定义一个极小的浮点数,用于处理梯度计算中的数值稳定性eps = np.finfo(float).eps# 将输入数据转换为 Torch 张量,并设置不需要梯度信息X = torchify(X, requires_grad=False)# 将重构后的输入数据转换为 Torch 张量,并进行数值裁剪,避免出现极端值X_recon = torchify(np.clip(X_recon, eps, 1 - eps))# 将均值数据转换为 Torch 张量t_mean = torchify(t_mean)# 将对数方差数据转换为 Torch 张量t_log_var = torchify(t_log_var)# 计算重构误差,使用二元交叉熵损失函数BCE = torch.sum(F.binary_cross_entropy(X_recon, X, reduction="none"), dim=1)# 计算 KL 散度,参考 VAE 论文的附录 B# Kingma and Welling. Auto-Encoding Variational Bayes. ICLR, 2014# https://arxiv.org/abs/1312.6114# 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2)KLD = -0.5 * torch.sum(1 + t_log_var - t_mean.pow(2) - t_log_var.exp(), dim=1)# 计算总损失,包括重构误差和 KL 散度loss = torch.mean(BCE + KLD)# 反向传播计算梯度loss.backward()# 将损失值和各个梯度信息保存到字典中并返回grads = {"loss": loss.detach().numpy(),"dX_recon": X_recon.grad.numpy(),"dt_mean": t_mean.grad.numpy(),"dt_log_var": t_log_var.grad.numpy(),}return grads
# 定义一个 TorchWGANGPLoss 类,继承自 nn.Module
class TorchWGANGPLoss(nn.Module):# 初始化函数,接受一个 lambda_ 参数,默认值为 10def __init__(self, lambda_=10):# 将 lambda_ 转换为张量形式self.lambda_ = torchify([lambda_])# 调用父类的初始化函数super(TorchWGANGPLoss, self).__init__()# 前向传播函数,接受 Y_real, Y_fake, gradInterp 三个参数def forward(self, Y_real, Y_fake, gradInterp):# 复制 Y_fake 到 GY_fakeGY_fake = Y_fake.copy()# 将 Y_real, Y_fake, GY_fake, gradInterp 转换为张量形式self.Y_real = torchify(Y_real)self.Y_fake = torchify(Y_fake)self.GY_fake = torchify(GY_fake)self.gradInterp = torchify(gradInterp)# 计算梯度惩罚norm = self.gradInterp.norm(2, dim=1)self.norm1 = torch.sqrt(torch.sum(self.gradInterp.pow(2), dim=1))# 断言两种计算方式得到的结果应该非常接近assert torch.allclose(norm, self.norm1)# 计算梯度惩罚项self.gpenalty = self.lambda_ * ((self.norm1 - 1).pow(2)).mean()# 计算 C_loss 和 G_lossself.C_loss = self.Y_fake.mean() - self.Y_real.mean() + self.gpenaltyself.G_loss = -self.GY_fake.mean()# 提取梯度信息函数,接受 Y_real, Y_fake, gradInterp 三个参数def extract_grads(self, Y_real, Y_fake, gradInterp):# 调用前向传播函数self.forward(Y_real, Y_fake, gradInterp)# 计算 C_loss 和 G_loss 的梯度self.C_loss.backward()self.G_loss.backward()# 将各个梯度信息转换为 numpy 数组形式,存储在字典中并返回grads = {"Y_real": self.Y_real.detach().numpy(),"Y_fake": self.Y_fake.detach().numpy(),"gradInterp": self.gradInterp.detach().numpy(),"GP": self.gpenalty.detach().numpy(),"C_loss": self.C_loss.detach().numpy(),"G_loss": self.G_loss.detach().numpy(),"C_dY_real": self.Y_real.grad.numpy(),"C_dGradInterp": self.gradInterp.grad.numpy(),"C_dY_fake": self.Y_fake.grad.numpy(),"G_dY_fake": self.GY_fake.grad.numpy(),}return grads# 定义一个 TorchLinearActivation 类,继承自 nn.Module
class TorchLinearActivation(nn.Module):# 初始化函数def __init__(self):# 调用父类的初始化函数super(TorchLinearActivation, self).__init__()pass# 静态方法,实现前向传播@staticmethoddef forward(input):return input# 静态方法,实现反向传播@staticmethoddef backward(grad_output):return torch.ones_like(grad_output)# 定义一个 TorchBatchNormLayer 类,继承自 nn.Module
class TorchBatchNormLayer(nn.Module):# 初始化批量归一化层对象def __init__(self, n_in, params, mode, momentum=0.9, epsilon=1e-5):# 调用父类的初始化方法super(TorchBatchNormLayer, self).__init__()# 从参数中获取缩放因子和截距scaler = params["scaler"]intercept = params["intercept"]# 根据模式选择不同维度的批量归一化层if mode == "1D":self.layer1 = nn.BatchNorm1d(num_features=n_in, momentum=1 - momentum, eps=epsilon, affine=True)elif mode == "2D":self.layer1 = nn.BatchNorm2d(num_features=n_in, momentum=1 - momentum, eps=epsilon, affine=True)# 设置批量归一化层的权重和偏置self.layer1.weight = nn.Parameter(torch.FloatTensor(scaler))self.layer1.bias = nn.Parameter(torch.FloatTensor(intercept))# 前向传播函数def forward(self, X):# 调整输入张量的维度顺序,从(N, H, W, C)到(N, C, H, W)if X.ndim == 4:X = np.moveaxis(X, [0, 1, 2, 3], [0, -2, -1, -3])# 如果输入不是torch张量,则转换为torch张量if not isinstance(X, torch.Tensor):X = torchify(X)# 保存输入张量和经过批量归一化层后的输出张量self.X = Xself.Y = self.layer1(self.X)# 保留输出张量的梯度信息self.Y.retain_grad()# 从神经网络中提取梯度信息def extract_grads(self, X, Y_true=None):# 进行前向传播self.forward(X)# 如果真实标签是 NumPy 数组if isinstance(Y_true, np.ndarray):# 调整真实标签的维度顺序Y_true = np.moveaxis(Y_true, [0, 1, 2, 3], [0, -2, -1, -3])# 计算损失函数self.loss1 = (0.5 * F.mse_loss(self.Y, torchify(Y_true), size_average=False).sum())else:# 如果没有真实标签,直接将输出求和作为损失self.loss1 = self.Y.sum()# 反向传播计算梯度self.loss1.backward()# 将张量转换为 NumPy 数组X_np = self.X.detach().numpy()Y_np = self.Y.detach().numpy()dX_np = self.X.grad.numpy()dY_np = self.Y.grad.numpy()# 如果输入数据的维度为4if self.X.dim() == 4:orig, X_swap = [0, 1, 2, 3], [0, -1, -3, -2]# 调整真实标签的维度顺序if isinstance(Y_true, np.ndarray):Y_true = np.moveaxis(Y_true, orig, X_swap)X_np = np.moveaxis(X_np, orig, X_swap)Y_np = np.moveaxis(Y_np, orig, X_swap)dX_np = np.moveaxis(dX_np, orig, X_swap)dY_np = np.moveaxis(dY_np, orig, X_swap)# 构建梯度字典grads = {"loss": self.loss1.detach().numpy(),"X": X_np,"momentum": 1 - self.layer1.momentum,"epsilon": self.layer1.eps,"intercept": self.layer1.bias.detach().numpy(),"scaler": self.layer1.weight.detach().numpy(),"running_mean": self.layer1.running_mean.detach().numpy(),"running_var": self.layer1.running_var.detach().numpy(),"y": Y_np,"dLdy": dY_np,"dLdIntercept": self.layer1.bias.grad.numpy(),"dLdScaler": self.layer1.weight.grad.numpy(),"dLdX": dX_np,}# 如果真实标签是 NumPy 数组,将其加入梯度字典if isinstance(Y_true, np.ndarray):grads["Y_true"] = Y_true# 返回梯度字典return grads
# 定义一个继承自 nn.Module 的 TorchLayerNormLayer 类
class TorchLayerNormLayer(nn.Module):# 初始化方法,接受特征维度、参数、模式和 epsilon 参数def __init__(self, feat_dims, params, mode, epsilon=1e-5):super(TorchLayerNormLayer, self).__init__()# 创建 LayerNorm 层,指定特征维度、epsilon 值和是否启用 elementwise_affineself.layer1 = nn.LayerNorm(normalized_shape=feat_dims, eps=epsilon, elementwise_affine=True)# 从参数中获取 scaler 和 interceptscaler = params["scaler"]intercept = params["intercept"]# 如果模式为 "2D",则调整 scaler 和 intercept 的维度if mode == "2D":scaler = np.moveaxis(scaler, [0, 1, 2], [-2, -1, -3])intercept = np.moveaxis(intercept, [0, 1, 2], [-2, -1, -3])# 断言 scaler 和 intercept 的形状与 LayerNorm 层的权重和偏置形状相同assert scaler.shape == self.layer1.weight.shapeassert intercept.shape == self.layer1.bias.shape# 将 scaler 和 intercept 转换为 nn.Parameter 类型,并赋值给 LayerNorm 层的权重和偏置self.layer1.weight = nn.Parameter(torch.FloatTensor(scaler))self.layer1.bias = nn.Parameter(torch.FloatTensor(intercept))# 前向传播方法,接受输入 Xdef forward(self, X):# 如果输入 X 的维度为 4,则调整维度顺序为 (N, C, H, W)if X.ndim == 4:X = np.moveaxis(X, [0, 1, 2, 3], [0, -2, -1, -3])# 如果输入 X 不是 torch.Tensor 类型,则转换为 torch.Tensorif not isinstance(X, torch.Tensor):X = torchify(X)# 将输入 X 保存在 self.X 中,并通过 LayerNorm 层得到输出 self.Yself.X = Xself.Y = self.layer1(self.X)# 保留 self.Y 的梯度信息self.Y.retain_grad()# 从输入数据 X 中提取梯度信息,如果提供了真实标签 Y_true,则计算损失def extract_grads(self, X, Y_true=None):# 进行前向传播self.forward(X)# 如果 Y_true 是 numpy 数组,则调整其维度顺序if isinstance(Y_true, np.ndarray):Y_true = np.moveaxis(Y_true, [0, 1, 2, 3], [0, -2, -1, -3])# 计算损失函数self.loss1 = (0.5 * F.mse_loss(self.Y, torchify(Y_true), size_average=False).sum())else:# 如果没有提供 Y_true,则将损失设为 Y 的总和self.loss1 = self.Y.sum()# 反向传播计算梯度self.loss1.backward()# 将张量转换为 numpy 数组X_np = self.X.detach().numpy()Y_np = self.Y.detach().numpy()dX_np = self.X.grad.numpy()dY_np = self.Y.grad.numpy()intercept_np = self.layer1.bias.detach().numpy()scaler_np = self.layer1.weight.detach().numpy()dIntercept_np = self.layer1.bias.grad.numpy()dScaler_np = self.layer1.weight.grad.numpy()# 如果输入数据 X 的维度为 4,则调整维度顺序if self.X.dim() == 4:orig, X_swap = [0, 1, 2, 3], [0, -1, -3, -2]orig_p, p_swap = [0, 1, 2], [-1, -3, -2]if isinstance(Y_true, np.ndarray):Y_true = np.moveaxis(Y_true, orig, X_swap)X_np = np.moveaxis(X_np, orig, X_swap)Y_np = np.moveaxis(Y_np, orig, X_swap)dX_np = np.moveaxis(dX_np, orig, X_swap)dY_np = np.moveaxis(dY_np, orig, X_swap)scaler_np = np.moveaxis(scaler_np, orig_p, p_swap)intercept_np = np.moveaxis(intercept_np, orig_p, p_swap)dScaler_np = np.moveaxis(dScaler_np, orig_p, p_swap)dIntercept_np = np.moveaxis(dIntercept_np, orig_p, p_swap)# 构建梯度字典grads = {"loss": self.loss1.detach().numpy(),"X": X_np,"epsilon": self.layer1.eps,"intercept": intercept_np,"scaler": scaler_np,"y": Y_np,"dLdy": dY_np,"dLdIntercept": dIntercept_np,"dLdScaler": dScaler_np,"dLdX": dX_np,}# 如果提供了 Y_true,则将其加入梯度字典if isinstance(Y_true, np.ndarray):grads["Y_true"] = Y_true# 返回梯度字典return grads
class TorchAddLayer(nn.Module):# 定义 TorchAddLayer 类,继承自 nn.Moduledef __init__(self, act_fn, **kwargs):# 初始化函数,接受激活函数 act_fn 和其他关键字参数super(TorchAddLayer, self).__init__()# 调用父类的初始化函数self.act_fn = act_fn# 设置实例变量 act_fn 为传入的激活函数def forward(self, Xs):# 前向传播函数,接受输入 Xsself.Xs = []# 初始化实例变量 Xs 为空列表x = Xs[0].copy()# 复制输入列表中的第一个元素if not isinstance(x, torch.Tensor):# 如果 x 不是 torch.Tensor 类型x = torchify(x)# 将 x 转换为 torch.Tensor 类型self.sum = x.clone()# 克隆 x 并赋值给实例变量 sumx.retain_grad()# 保留 x 的梯度信息self.Xs.append(x)# 将 x 添加到 Xs 列表中for i in range(1, len(Xs)):# 遍历输入列表中的其他元素x = Xs[i]# 获取当前元素if not isinstance(x, torch.Tensor):# 如果 x 不是 torch.Tensor 类型x = torchify(x)# 将 x 转换为 torch.Tensor 类型x.retain_grad()# 保留 x 的梯度信息self.Xs.append(x)# 将 x 添加到 Xs 列表中self.sum += x# 将 x 加到 sum 中self.sum.retain_grad()# 保留 sum 的梯度信息self.Y = self.act_fn(self.sum)# 计算 sum 的激活值并赋值给实例变量 Yself.Y.retain_grad()# 保留 Y 的梯度信息return self.Y# 返回 Ydef extract_grads(self, X):# 提取梯度信息函数,接受输入 Xself.forward(X)# 调用前向传播函数self.loss = self.Y.sum()# 计算损失值并赋值给实例变量 lossself.loss.backward()# 反向传播计算梯度grads = {# 定义梯度字典"Xs": X,# 输入 X"Sum": self.sum.detach().numpy(),# sum 的值"Y": self.Y.detach().numpy(),# Y 的值"dLdY": self.Y.grad.numpy(),# Y 的梯度"dLdSum": self.sum.grad.numpy(),# sum 的梯度}grads.update(# 更新梯度字典{"dLdX{}".format(i + 1): xi.grad.numpy() for i, xi in enumerate(self.Xs)}# 遍历 Xs 列表,获取每个元素的梯度信息)return grads# 返回梯度字典class TorchMultiplyLayer(nn.Module):# 定义 TorchMultiplyLayer 类,继承自 nn.Moduledef __init__(self, act_fn, **kwargs):# 初始化函数,接受激活函数 act_fn 和其他关键字参数super(TorchMultiplyLayer, self).__init__()# 调用父类的初始化函数self.act_fn = act_fn# 设置实例变量 act_fn 为传入的激活函数def forward(self, Xs):# 前向传播函数,接受输入 Xsself.Xs = []# 初始化实例变量 Xs 为空列表x = Xs[0].copy()# 复制输入列表中的第一个元素if not isinstance(x, torch.Tensor):# 如果 x 不是 torch.Tensor 类型x = torchify(x)# 将 x 转换为 torch.Tensor 类型self.prod = x.clone()# 克隆 x 并赋值给实例变量 prodx.retain_grad()# 保留 x 的梯度信息self.Xs.append(x)# 将 x 添加到 Xs 列表中for i in range(1, len(Xs)):# 遍历输入列表中的其他元素x = Xs[i]# 获取当前元素if not isinstance(x, torch.Tensor):# 如果 x 不是 torch.Tensor 类型x = torchify(x)# 将 x 转换为 torch.Tensor 类型x.retain_grad()# 保留 x 的梯度信息self.Xs.append(x)# 将 x 添加到 Xs 列表中self.prod *= x# 将 x 乘到 prod 中self.prod.retain_grad()# 保留 prod 的梯度信息self.Y = self.act_fn(self.prod)# 计算 prod 的激活值并赋值给实例变量 Yself.Y.retain_grad()# 保留 Y 的梯度信息return self.Y# 返回 Y# 定义一个方法用于提取梯度信息def extract_grads(self, X):# 调用神经网络的前向传播方法self.forward(X)# 计算损失值,将所有元素求和self.loss = self.Y.sum()# 反向传播计算梯度self.loss.backward()# 构建包含各个梯度信息的字典grads = {"Xs": X, # 输入数据"Prod": self.prod.detach().numpy(), # 中间变量 prod 的值"Y": self.Y.detach().numpy(), # 神经网络输出的值"dLdY": self.Y.grad.numpy(), # 损失函数对 Y 的梯度"dLdProd": self.prod.grad.numpy(), # 损失函数对 prod 的梯度}# 更新字典,包含每个输入数据对应的梯度信息grads.update({"dLdX{}".format(i + 1): xi.grad.numpy() for i, xi in enumerate(self.Xs)})# 返回包含梯度信息的字典return grads
class TorchSkipConnectionIdentity(nn.Module):# 定义一个 TorchSkipConnectionIdentity 类,继承自 nn.Moduledef forward(self, X):# 定义 forward 方法,接受输入 Xif not isinstance(X, torch.Tensor):# 如果 X 不是 torch.Tensor 类型# 将 X 的维度从 (N, H, W, C) 调整为 (N, C, H, W)X = np.moveaxis(X, [0, 1, 2, 3], [0, -2, -1, -3])# 将调整后的 X 转换为 torch.Tensor 类型X = torchify(X)self.X = X# 保留 X 的梯度信息self.X.retain_grad()self.conv1_out = self.conv1(self.X)# 保留 conv1_out 的梯度信息self.conv1_out.retain_grad()self.act_fn1_out = self.act_fn(self.conv1_out)# 保留 act_fn1_out 的梯度信息self.act_fn1_out.retain_grad()self.batchnorm1_out = self.batchnorm1(self.act_fn1_out)# 保留 batchnorm1_out 的梯度信息self.batchnorm1_out.retain_grad()self.conv2_out = self.conv2(self.batchnorm1_out)# 保留 conv2_out 的梯度信息self.conv2_out.retain_grad()self.batchnorm2_out = self.batchnorm2(self.conv2_out)# 保留 batchnorm2_out 的梯度信息self.batchnorm2_out.retain_grad()self.layer3_in = self.batchnorm2_out + self.X# 保留 layer3_in 的梯度信息self.layer3_in.retain_grad()self.Y = self.act_fn(self.layer3_in)# 保留 Y 的梯度信息self.Y.retain_grad()class TorchCausalConv1d(torch.nn.Conv1d):"""https://github.com/pytorch/pytorch/issues/1333NB: this is only ensures that the convolution out length is the same asthe input length IFF stride = 1. Otherwise, in/out lengths will differ."""# 定义 TorchCausalConv1d 类,继承自 torch.nn.Conv1ddef __init__(self,in_channels,out_channels,kernel_size,stride=1,dilation=1,groups=1,bias=True,):# 初始化方法,接受输入参数self.__padding = (kernel_size - 1) * dilationsuper(TorchCausalConv1d, self).__init__(in_channels,out_channels,kernel_size=kernel_size,stride=stride,padding=self.__padding,dilation=dilation,groups=groups,bias=bias,)def forward(self, input):# 定义 forward 方法,接受输入 inputresult = super(TorchCausalConv1d, self).forward(input)if self.__padding != 0:return result[:, :, : -self.__padding]return resultclass TorchWavenetModule(nn.Module):# 定义 TorchWavenetModule 类,继承自 nn.Module# 初始化 TorchWavenetModule 类,接受参数 params, hparams, conv_1x1_paddef __init__(self, params, hparams, conv_1x1_pad):# 调用父类的初始化方法super(TorchWavenetModule, self).__init__()# 创建 TorchCausalConv1d 对象,用于实现膨胀卷积self.conv_dilation = TorchCausalConv1d(in_channels=hparams["components"]["conv_dilation"]["in_ch"],out_channels=hparams["components"]["conv_dilation"]["out_ch"],kernel_size=hparams["components"]["conv_dilation"]["kernel_width"],stride=hparams["components"]["conv_dilation"]["stride"],dilation=hparams["components"]["conv_dilation"]["dilation"] + 1,bias=True,)# 创建 nn.Conv1d 对象,用于实现 1x1 卷积self.conv_1x1 = nn.Conv1d(in_channels=hparams["components"]["conv_1x1"]["in_ch"],out_channels=hparams["components"]["conv_1x1"]["out_ch"],kernel_size=hparams["components"]["conv_1x1"]["kernel_width"],stride=hparams["components"]["conv_1x1"]["stride"],padding=conv_1x1_pad,dilation=hparams["components"]["conv_1x1"]["dilation"] + 1,bias=True,)# 初始化膨胀卷积的权重和偏置W = params["components"]["conv_dilation"]["W"]b = params["components"]["conv_dilation"]["b"]W = np.moveaxis(W, [0, 1, 2], [-1, -2, -3]) # 调整权重的维度顺序self.conv_dilation.weight = nn.Parameter(torch.FloatTensor(W))self.conv_dilation.bias = nn.Parameter(torch.FloatTensor(b.flatten()))assert self.conv_dilation.weight.shape == W.shapeassert self.conv_dilation.bias.shape == b.flatten().shape# 初始化 1x1 卷积的权重和偏置W = params["components"]["conv_1x1"]["W"]b = params["components"]["conv_1x1"]["b"]W = np.moveaxis(W, [0, 1, 2], [-1, -2, -3]) # 调整权重的维度顺序self.conv_1x1.weight = nn.Parameter(torch.FloatTensor(W))self.conv_1x1.bias = nn.Parameter(torch.FloatTensor(b.flatten()))assert self.conv_1x1.weight.shape == W.shapeassert self.conv_1x1.bias.shape == b.flatten().shapedef forward(self, X_main, X_skip):# 将输入数据的维度顺序从(N, W, C)转换为(N, C, W)self.X_main = np.moveaxis(X_main, [0, 1, 2], [0, -1, -2])# 将转换后的数据转换为torch张量self.X_main = torchify(self.X_main)# 保留梯度信息self.X_main.retain_grad()# 使用卷积扩张操作处理转换后的数据self.conv_dilation_out = self.conv_dilation(self.X_main)self.conv_dilation_out.retain_grad()# 对卷积扩张输出进行tanh和sigmoid激活函数处理self.tanh_out = torch.tanh(self.conv_dilation_out)self.sigm_out = torch.sigmoid(self.conv_dilation_out)# 保留梯度信息self.tanh_out.retain_grad()self.sigm_out.retain_grad()# 将tanh和sigmoid输出相乘self.multiply_gate_out = self.tanh_out * self.sigm_outself.multiply_gate_out.retain_grad()# 使用1x1卷积处理相乘结果self.conv_1x1_out = self.conv_1x1(self.multiply_gate_out)self.conv_1x1_out.retain_grad()# 初始化X_skip为与conv_1x1_out相同形状的全零张量self.X_skip = torch.zeros_like(self.conv_1x1_out)# 如果X_skip不为空,则将其转换为torch张量if X_skip is not None:self.X_skip = torchify(np.moveaxis(X_skip, [0, 1, 2], [0, -1, -2]))self.X_skip.retain_grad()# 计算Y_skip和Y_mainself.Y_skip = self.X_skip + self.conv_1x1_outself.Y_main = self.X_main + self.conv_1x1_out# 保留梯度信息self.Y_skip.retain_grad()self.Y_main.retain_grad()
class TorchSkipConnectionConv(nn.Module):def __init__(self, act_fn, pad1, pad2, pad_skip, params, hparams, momentum=0.9, epsilon=1e-5# 初始化函数,定义了跳跃连接卷积层的参数和超参数def forward(self, X):# 检查输入是否为 torch.Tensor 类型,如果不是则进行转换if not isinstance(X, torch.Tensor):# 将输入的维度顺序从 (N, H, W, C) 调整为 (N, C, H, W)X = np.moveaxis(X, [0, 1, 2, 3], [0, -2, -1, -3])X = torchify(X)self.X = Xself.X.retain_grad()# 对输入进行第一次卷积操作self.conv1_out = self.conv1(self.X)self.conv1_out.retain_grad()# 对第一次卷积结果应用激活函数self.act_fn1_out = self.act_fn(self.conv1_out)self.act_fn1_out.retain_grad()# 对激活函数输出进行批归一化self.batchnorm1_out = self.batchnorm1(self.act_fn1_out)self.batchnorm1_out.retain_grad()# 对批归一化结果进行第二次卷积操作self.conv2_out = self.conv2(self.batchnorm1_out)self.conv2_out.retain_grad()# 对第二次卷积结果进行批归一化self.batchnorm2_out = self.batchnorm2(self.conv2_out)self.batchnorm2_out.retain_grad()# 对输入进行跳跃连接卷积操作self.c_skip_out = self.conv_skip(self.X)self.c_skip_out.retain_grad()# 对跳跃连接卷积结果进行批归一化self.bn_skip_out = self.batchnorm_skip(self.c_skip_out)self.bn_skip_out.retain_grad()# 将第二次卷积结果和跳跃连接卷积结果相加作为第三层的输入self.layer3_in = self.batchnorm2_out + self.bn_skip_outself.layer3_in.retain_grad()# 对第三层的输入应用激活函数self.Y = self.act_fn(self.layer3_in)self.Y.retain_grad()class TorchBidirectionalLSTM(nn.Module):def forward(self, X):# 将输入的维度顺序从 (batch, input_size, seq_len) 调整为 (seq_len, batch, input_size)self.X = np.moveaxis(X, [0, 1, 2], [-2, -1, -3])# 检查输入是否为 torch.Tensor 类型,如果不是则进行转换if not isinstance(self.X, torch.Tensor):self.X = torchify(self.X)self.X.retain_grad()# 初始化隐藏状态为 0n_ex, n_in, n_timesteps = self.X.shapen_out, n_out = self.layer1.weight_hh_l0.shape# 前向传播self.A, (At, Ct) = self.layer1(self.X)self.A.retain_grad()return self.Aclass TorchPool2DLayer(nn.Module):# 初始化 TorchPool2DLayer 类,设置输入通道数和超参数def __init__(self, in_channels, hparams, **kwargs):# 调用父类的初始化方法super(TorchPool2DLayer, self).__init__()# 根据超参数中的模式选择不同的池化层if hparams["mode"] == "max":self.layer1 = nn.MaxPool2d(kernel_size=hparams["kernel_shape"],padding=hparams["pad"],stride=hparams["stride"],)elif hparams["mode"] == "average":self.layer1 = nn.AvgPool2d(kernel_size=hparams["kernel_shape"],padding=hparams["pad"],stride=hparams["stride"],)# 前向传播函数def forward(self, X):# 将输入数据的维度顺序从 (N, H, W, C) 调整为 (N, C, H, W)self.X = np.moveaxis(X, [0, 1, 2, 3], [0, -2, -1, -3])# 如果输入数据不是 torch.Tensor 类型,则转换为 torch.Tensorif not isinstance(self.X, torch.Tensor):self.X = torchify(self.X)# 保留输入数据的梯度self.X.retain_grad()# 对输入数据进行池化操作,得到输出数据self.Y = self.layer1(self.X)# 保留输出数据的梯度self.Y.retain_grad()# 返回输出数据return self.Y# 提取梯度信息函数def extract_grads(self, X):# 运行前向传播函数得到输出数据self.forward(X)# 计算损失函数为输出数据的和self.loss = self.Y.sum()# 反向传播计算梯度self.loss.backward()# 调整梯度信息的维度顺序,以适应不同的表示方式orig, X_swap = [0, 1, 2, 3], [0, -1, -3, -2]grads = {"X": np.moveaxis(self.X.detach().numpy(), orig, X_swap),"y": np.moveaxis(self.Y.detach().numpy(), orig, X_swap),"dLdY": np.moveaxis(self.Y.grad.numpy(), orig, X_swap),"dLdX": np.moveaxis(self.X.grad.numpy(), orig, X_swap),}# 返回梯度信息字典return grads
# 定义一个 TorchConv2DLayer 类,继承自 nn.Module 类
class TorchConv2DLayer(nn.Module):# 初始化函数,接受输入通道数、输出通道数、激活函数、参数、超参数等参数def __init__(self, in_channels, out_channels, act_fn, params, hparams, **kwargs):# 调用父类的初始化函数super(TorchConv2DLayer, self).__init__()# 从参数中获取权重 W 和偏置 bW = params["W"]b = params["b"]# 保存激活函数self.act_fn = act_fn# 创建一个卷积层,设置输入通道数、输出通道数、卷积核形状、填充、步长、膨胀等参数self.layer1 = nn.Conv2d(in_channels,out_channels,hparams["kernel_shape"],padding=hparams["pad"],stride=hparams["stride"],dilation=hparams["dilation"] + 1,bias=True,)# 调整权重 W 的维度顺序,使其与卷积层的权重维度匹配W = np.moveaxis(W, [0, 1, 2, 3], [-2, -1, -3, -4])# 断言卷积层的权重形状与调整后的 W 的形状相同assert self.layer1.weight.shape == W.shape# 断言卷积层的偏置形状与展平后的 b 的形状相同assert self.layer1.bias.shape == b.flatten().shape# 将调整后的 W 转换为 PyTorch 的参数形式,并赋值给卷积层的权重self.layer1.weight = nn.Parameter(torch.FloatTensor(W))# 将展平后的 b 转换为 PyTorch 的参数形式,并赋值给卷积层的偏置self.layer1.bias = nn.Parameter(torch.FloatTensor(b.flatten()))# 前向传播函数,接受输入 X,进行卷积操作和激活函数操作,并返回结果def forward(self, X):# 调整输入 X 的维度顺序,使其与卷积层的输入维度匹配self.X = np.moveaxis(X, [0, 1, 2, 3], [0, -2, -1, -3])# 如果输入 X 不是 torch.Tensor 类型,则转换为 torch.Tensor 类型if not isinstance(self.X, torch.Tensor):self.X = torchify(self.X)# 保留输入 X 的梯度信息self.X.retain_grad()# 对输入 X 进行卷积操作,保存结果并保留梯度信息self.Z = self.layer1(self.X)self.Z.retain_grad()# 对卷积结果进行激活函数操作,保存结果并保留梯度信息self.Y = self.act_fn(self.Z)self.Y.retain_grad()# 返回激活函数的结果return self.Y# 提取梯度信息def extract_grads(self, X):# 进行前向传播self.forward(X)# 计算损失值self.loss = self.Y.sum()# 反向传播计算梯度self.loss.backward()# 定义坐标转换规则orig, X_swap, W_swap = [0, 1, 2, 3], [0, -1, -3, -2], [-1, -2, -4, -3]# 提取各个梯度信息并进行坐标转换grads = {"X": np.moveaxis(self.X.detach().numpy(), orig, X_swap),"W": np.moveaxis(self.layer1.weight.detach().numpy(), orig, W_swap),"b": self.layer1.bias.detach().numpy().reshape(1, 1, 1, -1),"y": np.moveaxis(self.Y.detach().numpy(), orig, X_swap),"dLdY": np.moveaxis(self.Y.grad.numpy(), orig, X_swap),"dLdZ": np.moveaxis(self.Z.grad.numpy(), orig, X_swap),"dLdW": np.moveaxis(self.layer1.weight.grad.numpy(), orig, W_swap),"dLdB": self.layer1.bias.grad.numpy().reshape(1, 1, 1, -1),"dLdX": np.moveaxis(self.X.grad.numpy(), orig, X_swap),}# 返回梯度信息字典return grads
class TorchConv1DLayer(nn.Module):# 定义一个继承自 nn.Module 的 TorchConv1DLayer 类def __init__(self, in_channels, out_channels, act_fn, params, hparams, **kwargs):# 初始化函数,接受输入通道数、输出通道数、激活函数、参数、超参数等参数# 调用父类的初始化函数super(TorchConv1DLayer, self).__init__()# 从参数中获取权重 W 和偏置 bW = params["W"]b = params["b"]self.act_fn = act_fn# 创建一个一维卷积层,设置输入通道数、输出通道数、卷积核宽度、填充、步长、膨胀等参数self.layer1 = nn.Conv1d(in_channels,out_channels,hparams["kernel_width"],padding=hparams["pad"],stride=hparams["stride"],dilation=hparams["dilation"] + 1,bias=True,)# 调整权重 W 的维度顺序W = np.moveaxis(W, [0, 1, 2], [-1, -2, -3])# 断言卷积层的权重形状与调整后的 W 的形状相同assert self.layer1.weight.shape == W.shape# 断言卷积层的偏置形状与展平后的 b 的形状相同assert self.layer1.bias.shape == b.flatten().shape# 将调整后的 W 赋值给卷积层的权重self.layer1.weight = nn.Parameter(torch.FloatTensor(W))# 将展平后的 b 赋值给卷积层的偏置self.layer1.bias = nn.Parameter(torch.FloatTensor(b.flatten()))def forward(self, X):# 前向传播函数,接受输入 X# 调整输入 X 的维度顺序self.X = np.moveaxis(X, [0, 1, 2], [0, -1, -2])# 如果输入 X 不是 torch.Tensor 类型,则转换为 torch.Tensor 类型if not isinstance(self.X, torch.Tensor):self.X = torchify(self.X)# 保留输入 X 的梯度信息self.X.retain_grad()# 对输入 X 进行卷积操作,得到 Zself.Z = self.layer1(self.X)# 保留 Z 的梯度信息self.Z.retain_grad()# 对 Z 应用激活函数,得到 Yself.Y = self.act_fn(self.Z)# 保留 Y 的梯度信息self.Y.retain_grad()# 返回 Yreturn self.Y# 提取梯度信息def extract_grads(self, X):# 进行前向传播self.forward(X)# 计算损失值self.loss = self.Y.sum()# 反向传播计算梯度self.loss.backward()# 定义坐标转换规则orig, X_swap, W_swap = [0, 1, 2], [0, -1, -2], [-1, -2, -3]# 提取各个梯度信息并进行坐标转换grads = {"X": np.moveaxis(self.X.detach().numpy(), orig, X_swap),"W": np.moveaxis(self.layer1.weight.detach().numpy(), orig, W_swap),"b": self.layer1.bias.detach().numpy().reshape(1, 1, -1),"y": np.moveaxis(self.Y.detach().numpy(), orig, X_swap),"dLdY": np.moveaxis(self.Y.grad.numpy(), orig, X_swap),"dLdZ": np.moveaxis(self.Z.grad.numpy(), orig, X_swap),"dLdW": np.moveaxis(self.layer1.weight.grad.numpy(), orig, W_swap),"dLdB": self.layer1.bias.grad.numpy().reshape(1, 1, -1),"dLdX": np.moveaxis(self.X.grad.numpy(), orig, X_swap),}# 返回梯度信息字典return grads
# 定义一个 TorchDeconv2DLayer 类,继承自 nn.Module
class TorchDeconv2DLayer(nn.Module):# 初始化函数,接受输入通道数、输出通道数、激活函数、参数、超参数等参数def __init__(self, in_channels, out_channels, act_fn, params, hparams, **kwargs):# 调用父类的初始化函数super(TorchDeconv2DLayer, self).__init__()# 从参数中获取权重和偏置W = params["W"]b = params["b"]self.act_fn = act_fn# 创建一个反卷积层,设置输入通道数、输出通道数、卷积核形状、填充、步幅等参数self.layer1 = nn.ConvTranspose2d(in_channels,out_channels,hparams["kernel_shape"],padding=hparams["pad"],stride=hparams["stride"],dilation=1,bias=True,)# 调整权重的维度顺序,使其与反卷积层的权重维度匹配W = np.moveaxis(W, [0, 1, 2, 3], [-2, -1, -4, -3])# 断言反卷积层的权重形状与调整后的权重形状相同assert self.layer1.weight.shape == W.shape# 断言反卷积层的偏置形状与调整后的偏置形状相同assert self.layer1.bias.shape == b.flatten().shape# 将调整后的权重设置为反卷积层的权重参数self.layer1.weight = nn.Parameter(torch.FloatTensor(W))# 将调整后的偏置设置为反卷积层的偏置参数self.layer1.bias = nn.Parameter(torch.FloatTensor(b.flatten()))# 前向传播函数,接受输入数据 X,返回激活后的输出数据 Ydef forward(self, X):# 调整输入数据的维度顺序,使其与反卷积层的输入数据维度匹配self.X = np.moveaxis(X, [0, 1, 2, 3], [0, -2, -1, -3])# 如果输入数据不是 torch.Tensor 类型,则转换为 torch.Tensor 类型if not isinstance(self.X, torch.Tensor):self.X = torchify(self.X)# 保留输入数据的梯度信息self.X.retain_grad()# 将输入数据传入反卷积层,得到输出数据 Z,并保留输出数据的梯度信息self.Z = self.layer1(self.X)self.Z.retain_grad()# 对输出数据 Z 应用激活函数,得到最终输出数据 Y,并保留输出数据的梯度信息self.Y = self.act_fn(self.Z)self.Y.retain_grad()# 返回最终输出数据 Yreturn self.Y# 提取梯度信息def extract_grads(self, X):# 进行前向传播self.forward(X)# 计算损失值self.loss = self.Y.sum()# 反向传播计算梯度self.loss.backward()# 定义坐标转换规则orig, X_swap, W_swap = [0, 1, 2, 3], [0, -1, -3, -2], [-2, -1, -4, -3]# 提取各个梯度信息并进行坐标转换grads = {"X": np.moveaxis(self.X.detach().numpy(), orig, X_swap),"W": np.moveaxis(self.layer1.weight.detach().numpy(), orig, W_swap),"b": self.layer1.bias.detach().numpy().reshape(1, 1, 1, -1),"y": np.moveaxis(self.Y.detach().numpy(), orig, X_swap),"dLdY": np.moveaxis(self.Y.grad.numpy(), orig, X_swap),"dLdZ": np.moveaxis(self.Z.grad.numpy(), orig, X_swap),"dLdW": np.moveaxis(self.layer1.weight.grad.numpy(), orig, W_swap),"dLdB": self.layer1.bias.grad.numpy().reshape(1, 1, 1, -1),"dLdX": np.moveaxis(self.X.grad.numpy(), orig, X_swap),}# 返回梯度信息字典return grads
# 定义一个继承自 nn.Module 的 TorchLSTMCell 类
class TorchLSTMCell(nn.Module):# 初始化方法,接受输入维度、输出维度、参数字典和其他关键字参数def __init__(self, n_in, n_out, params, **kwargs):# 调用父类的初始化方法super(TorchLSTMCell, self).__init__()# 从参数字典中获取权重矩阵,并转置Wiu = params["Wu"][n_out:, :].TWif = params["Wf"][n_out:, :].TWic = params["Wc"][n_out:, :].TWio = params["Wo"][n_out:, :].T# 将权重矩阵堆叠成输入权重矩阵W_ih = np.vstack([Wiu, Wif, Wic, Wio])# 从参数字典中获取权重矩阵,并转置Whu = params["Wu"][:n_out, :].TWhf = params["Wf"][:n_out, :].TWhc = params["Wc"][:n_out, :].TWho = params["Wo"][:n_out, :].T# 将权重矩阵堆叠成隐藏状态权重矩阵W_hh = np.vstack([Whu, Whf, Whc, Who])# 创建一个 LSTMCell 层,设置输入维度、输出维度和是否包含偏置self.layer1 = nn.LSTMCell(input_size=n_in, hidden_size=n_out, bias=True)# 断言输入权重矩阵的形状与 LSTMCell 层的输入权重矩阵形状相同assert self.layer1.weight_ih.shape == W_ih.shape# 断言隐藏状态权重矩阵的形状与 LSTMCell 层的隐藏状态权重矩阵形状相同assert self.layer1.weight_hh.shape == W_hh.shape# 将输入权重矩阵转换为可训练参数并赋值给 LSTMCell 层的输入权重矩阵self.layer1.weight_ih = nn.Parameter(torch.FloatTensor(W_ih))# 将隐藏状态权重矩阵转换为可训练参数并赋值给 LSTMCell 层的隐藏状态权重矩阵self.layer1.weight_hh = nn.Parameter(torch.FloatTensor(W_hh))# 将偏置参数从参数字典中提取并拼接成一个一维数组b = np.concatenate([params["bu"], params["bf"], params["bc"], params["bo"]], axis=-1).flatten()# 断言输入偏置参数的形状与 LSTMCell 层的输入偏置参数形状相同assert self.layer1.bias_ih.shape == b.shape# 断言隐藏状态偏置参数的形状与 LSTMCell 层的隐藏状态偏置参数形状相同assert self.layer1.bias_hh.shape == b.shape# 将偏置参数转换为可训练参数并赋值给 LSTMCell 层的输入偏置参数self.layer1.bias_ih = nn.Parameter(torch.FloatTensor(b))# 将偏置参数转换为可训练参数并赋值给 LSTMCell 层的隐藏状态偏置参数self.layer1.bias_hh = nn.Parameter(torch.FloatTensor(b))# 定义一个前向传播函数,接受输入 Xdef forward(self, X):# 将输入 X 存储在对象中self.X = X# 如果输入 X 不是 torch.Tensor 类型,则将其转换为 torch.Tensor 类型if not isinstance(self.X, torch.Tensor):self.X = torchify(self.X)# 保留输入 X 的梯度信息self.X.retain_grad()# 初始化隐藏状态为 0n_ex, n_in, n_timesteps = self.X.shapen_out, n_out = self.layer1.weight_hh.shape# 初始化隐藏状态 a0 和 c0a0 = torchify(np.zeros((n_ex, n_out)))c0 = torchify(np.zeros((n_ex, n_out)))a0.retain_grad()c0.retain_grad()# 执行前向传播A, C = [], []at = a0ct = c0for t in range(n_timesteps):A.append(at)C.append(ct)at1, ct1 = self.layer1(self.X[:, :, t], (at, ct))at.retain_grad()ct.retain_grad()at = at1ct = ct1at.retain_grad()ct.retain_grad()A.append(at)C.append(ct)# 不包括 a0 在输出中self.A = A[1:]self.C = C[1:]# 返回隐藏状态 A 和 Creturn self.A, self.C
class TorchRNNCell(nn.Module):# 定义 TorchRNNCell 类,继承自 nn.Moduledef __init__(self, n_in, n_hid, params, **kwargs):# 初始化方法super(TorchRNNCell, self).__init__()# 创建一个 RNNCell 层,输入维度为 n_in,隐藏层维度为 n_hid,包含偏置,激活函数为 tanhself.layer1 = nn.RNNCell(n_in, n_hid, bias=True, nonlinearity="tanh")# 设置权重和偏置以匹配 RNNCell 的权重和偏置# 注意:我们将 RNNCell 的权重和偏置的转置传递给 pytorch,这意味着我们需要针对权重的转置检查我们的输出的转置self.layer1.weight_ih = nn.Parameter(torch.FloatTensor(params["Wax"].T))self.layer1.weight_hh = nn.Parameter(torch.FloatTensor(params["Waa"].T))self.layer1.bias_ih = nn.Parameter(torch.FloatTensor(params["bx"].T))self.layer1.bias_hh = nn.Parameter(torch.FloatTensor(params["ba"].T))def forward(self, X):# 前向传播方法self.X = Xif not isinstance(self.X, torch.Tensor):self.X = torchify(self.X)self.X.retain_grad()# 初始隐藏状态为 0n_ex, n_in, n_timesteps = self.X.shapen_out, n_out = self.layer1.weight_hh.shape# 初始化隐藏状态a0 = torchify(np.zeros((n_ex, n_out)))a0.retain_grad()# 前向传播A = []at = a0for t in range(n_timesteps):A += [at]at1 = self.layer1(self.X[:, :, t], at)at.retain_grad()at = at1at.retain_grad()A += [at]# 不包括 a0 在我们的输出中self.A = A[1:]return self.A# 定义一个方法用于提取梯度信息def extract_grads(self, X):# 运行前向传播self.forward(X)# 计算损失值并将所有损失值叠加在一起self.loss = torch.stack(self.A).sum()# 反向传播计算梯度self.loss.backward()# 提取并保存各个参数的梯度信息到字典中grads = {"X": self.X.detach().numpy(),"ba": self.layer1.bias_hh.detach().numpy(),"bx": self.layer1.bias_ih.detach().numpy(),"Wax": self.layer1.weight_ih.detach().numpy(),"Waa": self.layer1.weight_hh.detach().numpy(),"y": torch.stack(self.A).detach().numpy(),"dLdA": np.array([a.grad.numpy() for a in self.A]),"dLdWaa": self.layer1.weight_hh.grad.numpy(),"dLdWax": self.layer1.weight_ih.grad.numpy(),"dLdBa": self.layer1.bias_hh.grad.numpy(),"dLdBx": self.layer1.bias_ih.grad.numpy(),"dLdX": self.X.grad.numpy(),}# 返回保存梯度信息的字典return grads
class TorchFCLayer(nn.Module):# 定义一个全连接层的类def __init__(self, n_in, n_hid, act_fn, params, **kwargs):# 初始化函数,接受输入维度、隐藏层维度、激活函数、参数等参数super(TorchFCLayer, self).__init__()# 调用父类的初始化函数self.layer1 = nn.Linear(n_in, n_hid)# 创建一个线性层,输入维度为n_in,输出维度为n_hid# explicitly set weights and bias# 明确设置权重和偏置# NB: we pass the *transpose* of the weights to pytorch, meaning# we'll need to check against the *transpose* of our outputs for# any function of the weights# 注意:我们将权重的转置传递给pytorch,这意味着我们需要检查权重的输出的转置self.layer1.weight = nn.Parameter(torch.FloatTensor(params["W"].T))# 设置权重为参数中W的转置self.layer1.bias = nn.Parameter(torch.FloatTensor(params["b"]))# 设置偏置为参数中bself.act_fn = act_fn# 设置激活函数self.model = nn.Sequential(self.layer1, self.act_fn)# 创建一个包含线性层和激活函数的序列模型def forward(self, X):# 前向传播函数self.X = X# 保存输入数据if not isinstance(X, torch.Tensor):self.X = torchify(X)# 如果输入数据不是torch张量,则转换为torch张量self.z1 = self.layer1(self.X)# 计算线性层的输出self.z1.retain_grad()# 保留梯度信息self.out1 = self.act_fn(self.z1)# 计算激活函数的输出self.out1.retain_grad()# 保留梯度信息def extract_grads(self, X):# 提取梯度信息的函数self.forward(X)# 调用前向传播函数self.loss1 = self.out1.sum()# 计算损失值self.loss1.backward()# 反向传播计算梯度grads = {"X": self.X.detach().numpy(),"b": self.layer1.bias.detach().numpy(),"W": self.layer1.weight.detach().numpy(),"y": self.out1.detach().numpy(),"dLdy": self.out1.grad.numpy(),"dLdZ": self.z1.grad.numpy(),"dLdB": self.layer1.bias.grad.numpy(),"dLdW": self.layer1.weight.grad.numpy(),"dLdX": self.X.grad.numpy(),}# 保存梯度信息到字典中return grads# 返回梯度信息字典class TorchEmbeddingLayer(nn.Module):# 定义一个嵌入层的类def __init__(self, vocab_size, n_out, params, **kwargs):# 初始化函数,接受词汇表大小、输出维度、参数等参数super(TorchEmbeddingLayer, self).__init__()# 调用父类的初始化函数self.layer1 = nn.Embedding(vocab_size, n_out)# 创建一个嵌入层,词汇表大小为vocab_size,输出维度为n_out# explicitly set embedding weights# 明确设置嵌入权重self.layer1.weight = nn.Parameter(torch.FloatTensor(params["W"]))# 设置嵌入层的权重为参数中的Wself.model = nn.Sequential(self.layer1)# 创建一个包含嵌入层的序列模型# 定义一个前向传播函数,接受输入 Xdef forward(self, X):# 将输入 X 存储在对象中self.X = X# 如果输入 X 不是 torch.Tensor 类型,则将其转换为 torch.Tensor 类型if not isinstance(X, torch.Tensor):self.X = torch.from_numpy(X)# 将输入 X 传递给第一层神经网络,并存储输出self.out1 = self.layer1(self.X)# 保留输出的梯度信息self.out1.retain_grad()# 定义一个提取梯度信息的函数,接受输入 Xdef extract_grads(self, X):# 调用前向传播函数self.forward(X)# 计算损失函数 loss1,为输出的和self.loss1 = self.out1.sum()# 反向传播计算梯度self.loss1.backward()# 提取并返回梯度信息grads = {"X": self.X.detach().numpy(),"W": self.layer1.weight.detach().numpy(),"y": self.out1.detach().numpy(),"dLdy": self.out1.grad.numpy(),"dLdW": self.layer1.weight.grad.numpy(),}return grads
class TorchSDPAttentionLayer(nn.Module):# 定义一个基于PyTorch的自注意力层def __init__(self):super(TorchSDPAttentionLayer, self).__init__()def forward(self, Q, K, V, mask=None):# 将输入的查询、键、值保存到对象中self.Q = Qself.K = Kself.V = V# 如果查询、键、值不是PyTorch张量,则转换为张量if not isinstance(self.Q, torch.Tensor):self.Q = torchify(self.Q)if not isinstance(self.K, torch.Tensor):self.K = torchify(self.K)if not isinstance(self.V, torch.Tensor):self.V = torchify(self.V)# 保留查询、键、值的梯度信息self.Q.retain_grad()self.K.retain_grad()self.V.retain_grad()# 获取键值对应的维度self.d_k = self.Q.size(-1)# 计算注意力分数self.scores = torch.matmul(self.Q, self.K.transpose(-2, -1)) / np.sqrt(self.d_k)# 如果存在掩码,则将分数中对应位置的值替换为负无穷if mask is not None:self.scores = self.scores.masked_fill(mask == 0, -1e9)self.scores.retain_grad()# 计算注意力权重self.weights = F.softmax(self.scores, dim=-1)self.weights.retain_grad()# 计算加权后的值self.Y = torch.matmul(self.weights, self.V)self.Y.retain_grad()# 返回加权后的值和注意力权重return self.Y, self.weightsdef extract_grads(self, Q, K, V, mask=None):# 调用前向传播计算梯度self.forward(Q, K, V, mask=mask)# 计算损失值self.loss1 = self.Y.sum()# 反向传播计算梯度self.loss1.backward()# 提取并返回各个参数的梯度信息grads = {"Q": self.Q.detach().numpy(),"K": self.K.detach().numpy(),"V": self.V.detach().numpy(),"d_k": self.d_k,"scores": self.scores.detach().numpy(),"weights": self.weights.detach().numpy(),"Y": self.Y.detach().numpy(),"dLdV": self.V.grad.numpy(),"dWeights": self.weights.grad.numpy(),"dScores": self.scores.grad.numpy(),"dLdQ": self.Q.grad.numpy(),"dLdK": self.K.grad.numpy(),}return gradsclass TorchMultiHeadedAttentionModule(nn.Module):# 初始化多头注意力模块,接受参数和超参数def __init__(self, params, hparams):# 调用父类的初始化方法super(TorchMultiHeadedAttentionModule, self).__init__()# 确保每个头的维度能够整除总维度assert hparams["kqv_dim"] % hparams["n_heads"] == 0# 设置头的数量self.n_heads = hparams["n_heads"]# 计算每个头的潜在维度self.latent_dim = hparams["kqv_dim"] // hparams["n_heads"]# 设置丢弃概率self.p_dropout = hparams["dropout_p"]# 初始化投影矩阵self.projections = {"Q": nn.Linear(hparams["kqv_dim"], hparams["kqv_dim"]),"K": nn.Linear(hparams["kqv_dim"], hparams["kqv_dim"]),"V": nn.Linear(hparams["kqv_dim"], hparams["kqv_dim"]),"O": nn.Linear(hparams["kqv_dim"], hparams["kqv_dim"]),}# 设置投影矩阵的权重和偏置self.projections["Q"].weight = nn.Parameter(torch.FloatTensor(params["components"]["Q"]["W"].T))self.projections["Q"].bias = nn.Parameter(torch.FloatTensor(params["components"]["Q"]["b"]))self.projections["K"].weight = nn.Parameter(torch.FloatTensor(params["components"]["K"]["W"].T))self.projections["K"].bias = nn.Parameter(torch.FloatTensor(params["components"]["K"]["b"]))self.projections["V"].weight = nn.Parameter(torch.FloatTensor(params["components"]["V"]["W"].T))self.projections["V"].bias = nn.Parameter(torch.FloatTensor(params["components"]["V"]["b"]))self.projections["O"].weight = nn.Parameter(torch.FloatTensor(params["components"]["O"]["W"].T))self.projections["O"].bias = nn.Parameter(torch.FloatTensor(params["components"]["O"]["b"]))# 初始化注意力和丢弃层self.attn = Noneself.dropout = nn.Dropout(p=hparams["dropout_p"])# 定义前向传播函数,接收查询(Q)、键(K)、值(V)和掩码(mask)作为输入def forward(self, Q, K, V, mask=None):# 将输入的查询(Q)、键(K)、值(V)保存到当前对象中self.Q = Qself.K = Kself.V = V# 如果查询(Q)不是torch.Tensor类型,则将其转换为torch.Tensor类型if not isinstance(self.Q, torch.Tensor):self.Q = torchify(self.Q)# 如果键(K)不是torch.Tensor类型,则将其转换为torch.Tensor类型if not isinstance(self.K, torch.Tensor):self.K = torchify(self.K)# 如果值(V)不是torch.Tensor类型,则将其转换为torch.Tensor类型if not isinstance(self.V, torch.Tensor):self.V = torchify(self.V)# 保留查询(Q)、键(K)、值(V)的梯度信息self.Q.retain_grad()self.K.retain_grad()self.V.retain_grad()# 如果存在掩码(mask),则将其扩展维度if mask is not None:mask = mask.unsqueeze(1)# 获取输入查询(Q)的样本数量n_ex = self.Q.size(0)# 对查询(Q)、键(K)、值(V)进行线性变换并重塑维度,然后转置self.Q_proj = (self.projections["Q"](self.Q).view(n_ex, -1, self.n_heads, self.latent_dim).transpose(1, 2))self.K_proj = (self.projections["K"](self.K).view(n_ex, -1, self.n_heads, self.latent_dim).transpose(1, 2))self.V_proj = (self.projections["V"](self.V).view(n_ex, -1, self.n_heads, self.latent_dim).transpose(1, 2))# 保留查询(Q)、键(K)、值(V)的梯度信息self.Q_proj.retain_grad()self.K_proj.retain_grad()self.V_proj.retain_grad()# 2) 在批处理中对所有投影向量应用注意力机制self.attn_out, self.attn = TorchSDPAttentionLayer().forward(self.Q_proj, self.K_proj, self.V_proj, mask=mask)# 保留注意力权重和输出的梯度信息self.attn.retain_grad()self.attn_out.retain_grad()# 3) 使用视图(view)进行“连接”并应用最终的线性变换self.attn_out_reshaped = (self.attn_out.transpose(1, 2).contiguous().view(n_ex, -1, self.n_heads * self.latent_dim))# 保留连接后的输出的梯度信息self.attn_out_reshaped.retain_grad()print(self.attn_out_reshaped.shape)# 对连接后的输出应用最终的线性变换self.Y = self.projections["O"](self.attn_out_reshaped)print(self.Y.shape)# 保留最终输出的梯度信息self.Y.retain_grad()
# 定义全局变量_params和_param_aliases,用于存储参数和参数别名
_params = {}
_param_aliases = {}# 定义param函数,用于创建共享参数变量
def param(name, *args, **kwargs):"""A wrapper for `tf.Variable` which enables parameter sharing in models.Creates and returns theano shared variables similarly to `tf.Variable`,except if you try to create a param with the same name as apreviously-created one, `param(...)` will just return the old one instead ofmaking a new one.This constructor also adds a `param` attribute to the shared variables itcreates, so that you can easily search a graph for all params."""# 如果参数名不在_params中,则创建新的参数并添加到_params中if name not in _params:kwargs["name"] = nameparam = tf.Variable(*args, **kwargs)param.param = True_params[name] = param# 如果参数名已存在于_params中,则直接返回已存在的参数result = _params[name]i = 0# 处理参数别名while result in _param_aliases:i += 1result = _param_aliases[result]return result# 根据参数名查找所有包含该名称的参数
def params_with_name(name):return [p for n, p in _params.items() if name in n]# 定义ReLULayer函数,实现ReLU激活函数的全连接层
def ReLULayer(name, n_in, n_out, inputs, w_initialization):if isinstance(w_initialization, np.ndarray):weight_values = w_initialization.astype("float32")# 创建权重参数W,并进行矩阵乘法运算W = param(name + ".W", weight_values)result = tf.matmul(inputs, W)# 添加偏置并进行ReLU激活output = tf.nn.bias_add(result, param(name + ".b", np.zeros((n_out,), dtype="float32")))output = tf.nn.relu(output)return output, W# 定义LinearLayer函数,实现线性全连接层
def LinearLayer(name, n_in, n_out, inputs, w_initialization):if isinstance(w_initialization, np.ndarray):weight_values = w_initialization.astype("float32")# 创建权重参数W,并进行矩阵乘法运算W = param(name + ".W", weight_values)result = tf.matmul(inputs, W)# 添加偏置output = tf.nn.bias_add(result, param(name + ".b", np.zeros((n_out,), dtype="float32")))# 返回 output 和 W 两个变量return output, W
# 生成器函数,用于生成数据
def Generator(n_samples, X_real, params=None):# 设置特征数为2n_feats = 2# 初始化权重矩阵W1 = W2 = W3 = W4 = "he"# 生成噪声数据noise = tf.random.normal([n_samples, 2])# 如果参数不为空,则使用参数中的值if params is not None:# 转换噪声数据为张量noise = tf.convert_to_tensor(params["noise"], dtype="float32")# 获取生成器的权重矩阵W1 = params["generator"]["FC1"]["W"]W2 = params["generator"]["FC2"]["W"]W3 = params["generator"]["FC3"]["W"]W4 = params["generator"]["FC4"]["W"]# 获取隐藏层维度和输入特征数DIM = params["g_hidden"]n_feats = params["n_in"]# 初始化输出字典和权重字典outs = {}weights = {}# 第一层全连接层output, W = ReLULayer("Generator.1", n_feats, DIM, noise, w_initialization=W1)outs["FC1"] = outputweights["FC1"] = W# 第二层全连接层output, W = ReLULayer("Generator.2", DIM, DIM, output, w_initialization=W2)outs["FC2"] = outputweights["FC2"] = W# 第三层全连接层output, W = ReLULayer("Generator.3", DIM, DIM, output, w_initialization=W3)outs["FC3"] = outputweights["FC3"] = W# 第四层全连接层output, W = LinearLayer("Generator.4", DIM, n_feats, output, w_initialization=W4)outs["FC4"] = outputweights["FC4"] = W# 返回输出、输出字典和权重字典return output, outs, weights# 判别器函数,用于判别数据真伪
def Discriminator(inputs, params=None):# 设置特征数为2n_feats = 2# 初始化权重矩阵W1 = W2 = W3 = W4 = "he"# 如果参数不为空,则使用参数中的值if params is not None:# 获取判别器的权重矩阵W1 = params["critic"]["FC1"]["W"]W2 = params["critic"]["FC2"]["W"]W3 = params["critic"]["FC3"]["W"]W4 = params["critic"]["FC4"]["W"]# 获取隐藏层维度和输入特征数DIM = params["g_hidden"]n_feats = params["n_in"]# 初始化输出字典和权重字典outs = {}weights = {}# 第一层全连接层output, W = ReLULayer("Discriminator.1", n_feats, DIM, inputs, w_initialization=W1)outs["FC1"] = outputweights["FC1"] = W# 第二层全连接层output, W = ReLULayer("Discriminator.2", DIM, DIM, output, w_initialization=W2)outs["FC2"] = outputweights["FC2"] = W# 第三层全连接层output, W = ReLULayer("Discriminator.3", DIM, DIM, output, w_initialization=W3)outs["FC3"] = outputweights["FC3"] = W# 第四层全连接层output, W = LinearLayer("Discriminator.4", DIM, 1, output, w_initialization=W4)outs["FC4"] = outputweights["FC4"] = W# 获取偏置项# 遍历参数列表中包含名称为"Discriminator"的参数for var in params_with_name("Discriminator"):# 如果参数名称中包含"1.b:",将该参数存入权重字典中的"FC1_b"键if "1.b:" in var.name:weights["FC1_b"] = var# 如果参数名称中包含"2.b:",将该参数存入权重字典中的"FC2_b"键elif "2.b:" in var.name:weights["FC2_b"] = var# 如果参数名称中包含"3.b:",将该参数存入权重字典中的"FC3_b"键elif "3.b:" in var.name:weights["FC3_b"] = var# 如果参数名称中包含"4.b:",将该参数存入权重字典中的"FC4_b"键elif "4.b:" in var.name:weights["FC4_b"] = var# 将输出结果进行重塑,将其形状变为一维数组return tf.reshape(output, [-1]), outs, weights
# 定义 WGAN-GP 模型的 TensorFlow 函数
def WGAN_GP_tf(X, lambda_, params, batch_size):# 禁用即时执行模式tf.compat.v1.disable_eager_execution()# 获取输入数据的批量大小batch_size = X.shape[0]# 获取超参数n_steps = params["n_steps"]c_updates_per_epoch = params["c_updates_per_epoch"]alpha = tf.convert_to_tensor(params["alpha"], dtype="float32")# 定义真实数据的占位符X_real = tf.compat.v1.placeholder(tf.float32, shape=[None, params["n_in"]])# 生成器生成假数据,获取生成器输出和权重X_fake, G_out_X_fake, G_weights = Generator(batch_size, X_real, params)# 判别器对真实数据进行判别,获取判别器输出和权重Y_real, C_out_Y_real, C_Y_real_weights = Discriminator(X_real, params)# 判别器对假数据进行判别,获取判别器输出和权重Y_fake, C_out_Y_fake, C_Y_fake_weights = Discriminator(X_fake, params)# 计算 WGAN 损失mean_fake = tf.reduce_mean(Y_fake)mean_real = tf.reduce_mean(Y_real)C_loss = tf.reduce_mean(Y_fake) - tf.reduce_mean(Y_real)G_loss = -tf.reduce_mean(Y_fake)# 计算 WGAN 梯度惩罚X_interp = alpha * X_real + ((1 - alpha) * X_fake)Y_interp, C_out_Y_interp, C_Y_interp_weights = Discriminator(X_interp, params)gradInterp = tf.gradients(Y_interp, [X_interp])[0]norm_gradInterp = tf.sqrt(tf.compat.v1.reduce_sum(tf.square(gradInterp), reduction_indices=[1]))gradient_penalty = tf.reduce_mean((norm_gradInterp - 1) ** 2)C_loss += lambda_ * gradient_penalty# 提取判别器对插值数据的梯度C_bwd_Y_interp = {}for k, v in C_out_Y_interp.items():C_bwd_Y_interp[k] = tf.gradients(Y_interp, [v])[0]# 提取判别器权重的梯度C_bwd_W = {}for k, v in C_Y_interp_weights.items():C_bwd_W[k] = tf.gradients(C_loss, [v])[0]# 获取梯度dC_Y_fake = tf.gradients(C_loss, [Y_fake])[0]dC_Y_real = tf.gradients(C_loss, [Y_real])[0]dC_gradInterp = tf.gradients(C_loss, [gradInterp])[0]dG_Y_fake = tf.gradients(G_loss, [Y_fake])[0]# 返回梯度return grads# 定义 TensorFlow 的负采样交叉熵损失函数
def TFNCELoss(X, target_word, L):from tensorflow.python.ops.nn_impl import _compute_sampled_logitsfrom tensorflow.python.ops.nn_impl import sigmoid_cross_entropy_with_logits# 禁用 TensorFlow 2.x 中的即时执行模式tf.compat.v1.disable_eager_execution()# 创建占位符,用于接收输入数据in_embed = tf.compat.v1.placeholder(tf.float32, shape=X.shape)in_bias = tf.compat.v1.placeholder(tf.float32, shape=L.parameters["b"].flatten().shape)in_weights = tf.compat.v1.placeholder(tf.float32, shape=L.parameters["W"].shape)in_target_word = tf.compat.v1.placeholder(tf.int64)in_neg_samples = tf.compat.v1.placeholder(tf.int32)in_target_prob = tf.compat.v1.placeholder(tf.float32)in_neg_samp_prob = tf.compat.v1.placeholder(tf.float32)# 创建 feed 字典,将输入数据传入对应的占位符feed = {in_embed: X,in_weights: L.parameters["W"],in_target_word: target_word,in_bias: L.parameters["b"].flatten(),in_neg_samples: L.derived_variables["noise_samples"][0],in_target_prob: L.derived_variables["noise_samples"][1],in_neg_samp_prob: L.derived_variables["noise_samples"][2],}# 使用负采样计算 NCE 损失nce_unreduced = tf.nn.nce_loss(weights=in_weights,biases=in_bias,labels=in_target_word,inputs=in_embed,sampled_values=(in_neg_samples, in_target_prob, in_neg_samp_prob),num_sampled=L.num_negative_samples,num_classes=L.n_classes,)# 计算总损失loss = tf.reduce_sum(nce_unreduced)# 计算损失对权重的梯度dLdW = tf.gradients(loss, [in_weights])[0]# 计算损失对偏置的梯度dLdb = tf.gradients(loss, [in_bias])[0]# 计算损失对输入数据的梯度dLdX = tf.gradients(loss, [in_embed])[0]# 计算采样后的logits和labelssampled_logits, sampled_labels = _compute_sampled_logits(weights=in_weights, # 输入权重biases=in_bias, # 输入偏置labels=in_target_word, # 目标词标签inputs=in_embed, # 输入嵌入sampled_values=(in_neg_samples, in_target_prob, in_neg_samp_prob), # 采样值num_sampled=L.num_negative_samples, # 负采样数量num_classes=L.n_classes, # 类别数量num_true=1, # 真实样本数量subtract_log_q=True, # 是否减去log(q))# 计算采样后的损失sampled_losses = sigmoid_cross_entropy_with_logits(labels=sampled_labels, # 采样标签logits=sampled_logits # 采样logits)# 创建一个会话with tf.compat.v1.Session() as session:# 初始化全局变量session.run(tf.compat.v1.global_variables_initializer())# 运行会话,获取损失和相关变量(_final_loss,_nce_unreduced,_dLdW,_dLdb,_dLdX,_sampled_logits,_sampled_labels,_sampled_losses,) = session.run([loss,nce_unreduced,dLdW,dLdb,dLdX,sampled_logits,sampled_labels,sampled_losses,],feed_dict=feed, # 喂入数据)# 重置默认图tf.compat.v1.reset_default_graph()# 返回结果字典return {"final_loss": _final_loss, # 最终损失"nce_unreduced": _nce_unreduced, # 未减少的nce"dLdW": _dLdW, # dL/dW"dLdb": _dLdb, # dL/db"dLdX": _dLdX, # dL/dX"out_logits": _sampled_logits, # 输出logits"out_labels": _sampled_labels, # 输出标签"sampled_loss": _sampled_losses, # 采样损失}