김성주

codes and final report

from vocabularies import VocabType
from config import Config
from interactive_predict import InteractivePredictor
from model_base import Code2VecModelBase
def load_model_dynamically(config: Config) -> Code2VecModelBase:
assert config.DL_FRAMEWORK in {'tensorflow', 'keras'}
if config.DL_FRAMEWORK == 'tensorflow':
from tensorflow_model import Code2VecModel
elif config.DL_FRAMEWORK == 'keras':
from keras_model import Code2VecModel
return Code2VecModel(config)
if __name__ == '__main__':
config = Config(set_defaults=True, load_from_args=True, verify=True)
model = load_model_dynamically(config)
if config.is_training:
model.train()
if config.SAVE_W2V is not None:
model.save_word2vec_format(config.SAVE_W2V, VocabType.Token)
config.log('Origin word vectors saved in word2vec text format in: %s' % config.SAVE_W2V)
if config.SAVE_T2V is not None:
model.save_word2vec_format(config.SAVE_T2V, VocabType.Target)
config.log('Target word vectors saved in word2vec text format in: %s' % config.SAVE_T2V)
if (config.is_testing and not config.is_training) or config.RELEASE:
eval_results = model.evaluate()
if eval_results is not None:
config.log(
str(eval_results).replace('topk', 'top{}'.format(config.TOP_K_WORDS_CONSIDERED_DURING_PREDICTION)))
if config.PREDICT:
predictor = InteractivePredictor(config, model)
predictor.predict()
model.close_session()
import re
import numpy as np
import tensorflow as tf
from itertools import takewhile, repeat
from typing import List, Optional, Tuple, Iterable
from datetime import datetime
from collections import OrderedDict
class common:
@staticmethod
def normalize_word(word):
stripped = re.sub(r'[^a-zA-Z]', '', word)
if len(stripped) == 0:
return word.lower()
else:
return stripped.lower()
@staticmethod
def _load_vocab_from_histogram(path, min_count=0, start_from=0, return_counts=False):
with open(path, 'r') as file:
word_to_index = {}
index_to_word = {}
word_to_count = {}
next_index = start_from
for line in file:
line_values = line.rstrip().split(' ')
if len(line_values) != 2:
continue
word = line_values[0]
count = int(line_values[1])
if count < min_count:
continue
if word in word_to_index:
continue
word_to_index[word] = next_index
index_to_word[next_index] = word
word_to_count[word] = count
next_index += 1
result = word_to_index, index_to_word, next_index - start_from
if return_counts:
result = (*result, word_to_count)
return result
@staticmethod
def load_vocab_from_histogram(path, min_count=0, start_from=0, max_size=None, return_counts=False):
if max_size is not None:
word_to_index, index_to_word, next_index, word_to_count = \
common._load_vocab_from_histogram(path, min_count, start_from, return_counts=True)
if next_index <= max_size:
results = (word_to_index, index_to_word, next_index)
if return_counts:
results = (*results, word_to_count)
return results
# Take min_count to be one plus the count of the max_size'th word
min_count = sorted(word_to_count.values(), reverse=True)[max_size] + 1
return common._load_vocab_from_histogram(path, min_count, start_from, return_counts)
@staticmethod
def load_json(json_file):
data = []
with open(json_file, 'r') as file:
for line in file:
current_program = common.process_single_json_line(line)
if current_program is None:
continue
for element, scope in current_program.items():
data.append((element, scope))
return data
@staticmethod
def load_json_streaming(json_file):
with open(json_file, 'r') as file:
for line in file:
current_program = common.process_single_json_line(line)
if current_program is None:
continue
for element, scope in current_program.items():
yield (element, scope)
@staticmethod
def save_word2vec_file(output_file, index_to_word, vocab_embedding_matrix: np.ndarray):
assert len(vocab_embedding_matrix.shape) == 2
vocab_size, embedding_dimension = vocab_embedding_matrix.shape
output_file.write('%d %d\n' % (vocab_size, embedding_dimension))
for word_idx in range(0, vocab_size):
assert word_idx in index_to_word
word_str = index_to_word[word_idx]
output_file.write(word_str + ' ')
output_file.write(' '.join(map(str, vocab_embedding_matrix[word_idx])) + '\n')
@staticmethod
def calculate_max_contexts(file):
contexts_per_word = common.process_test_input(file)
return max(
[max(l, default=0) for l in [[len(contexts) for contexts in prog.values()] for prog in contexts_per_word]],
default=0)
@staticmethod
def binary_to_string(binary_string):
return binary_string.decode("utf-8")
@staticmethod
def binary_to_string_list(binary_string_list):
return [common.binary_to_string(w) for w in binary_string_list]
@staticmethod
def binary_to_string_matrix(binary_string_matrix):
return [common.binary_to_string_list(l) for l in binary_string_matrix]
@staticmethod
def load_file_lines(path):
with open(path, 'r') as f:
return f.read().splitlines()
@staticmethod
def split_to_batches(data_lines, batch_size):
for x in range(0, len(data_lines), batch_size):
yield data_lines[x:x + batch_size]
@staticmethod
def legal_method_names_checker(special_words, name):
return name != special_words.OOV and re.match(r'^[a-zA-Z_|]+[a-zA-Z_]+[a-zA-Z0-9_]+$', name)
@staticmethod
def filter_impossible_names(special_words, top_words):
result = list(filter(lambda word: common.legal_method_names_checker(special_words, word), top_words))
return result
@staticmethod
def get_subtokens(str):
return str.split('|')
@staticmethod
def parse_prediction_results(raw_prediction_results, unhash_dict, special_words, topk: int = 5) -> List['MethodPredictionResults']:
prediction_results = []
for single_method_prediction in raw_prediction_results:
current_method_prediction_results = MethodPredictionResults(single_method_prediction.original_name)
for i, predicted in enumerate(single_method_prediction.topk_predicted_words):
if predicted == special_words.OOV:
continue
suggestion_subtokens = common.get_subtokens(predicted)
current_method_prediction_results.append_prediction(
suggestion_subtokens, single_method_prediction.topk_predicted_words_scores[i].item())
topk_attention_per_context = [
(key, single_method_prediction.attention_per_context[key])
for key in sorted(single_method_prediction.attention_per_context,
key=single_method_prediction.attention_per_context.get, reverse=True)
][:topk]
for context, attention in topk_attention_per_context:
token1, hashed_path, token2 = context
if hashed_path in unhash_dict:
unhashed_path = unhash_dict[hashed_path]
current_method_prediction_results.append_attention_path(attention.item(), token1=token1,
path=unhashed_path, token2=token2)
prediction_results.append(current_method_prediction_results)
return prediction_results
@staticmethod
def tf_get_first_true(bool_tensor: tf.Tensor) -> tf.Tensor:
bool_tensor_as_int32 = tf.cast(bool_tensor, dtype=tf.int32)
cumsum = tf.cumsum(bool_tensor_as_int32, axis=-1, exclusive=False)
return tf.logical_and(tf.equal(cumsum, 1), bool_tensor)
@staticmethod
def count_lines_in_file(file_path: str):
with open(file_path, 'rb') as f:
bufgen = takewhile(lambda x: x, (f.raw.read(1024 * 1024) for _ in repeat(None)))
return sum(buf.count(b'\n') for buf in bufgen)
@staticmethod
def squeeze_single_batch_dimension_for_np_arrays(arrays):
assert all(array is None or isinstance(array, np.ndarray) or isinstance(array, tf.Tensor) for array in arrays)
return tuple(
None if array is None else np.squeeze(array, axis=0)
for array in arrays
)
@staticmethod
def get_first_match_word_from_top_predictions(special_words, original_name, top_predicted_words) -> Optional[Tuple[int, str]]:
normalized_original_name = common.normalize_word(original_name)
for suggestion_idx, predicted_word in enumerate(common.filter_impossible_names(special_words, top_predicted_words)):
normalized_possible_suggestion = common.normalize_word(predicted_word)
if normalized_original_name == normalized_possible_suggestion:
return suggestion_idx, predicted_word
return None
@staticmethod
def now_str():
return datetime.now().strftime("%Y%m%d-%H%M%S: ")
@staticmethod
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
@staticmethod
def get_unique_list(lst: Iterable) -> list:
return list(OrderedDict(((item, 0) for item in lst)).keys())
class MethodPredictionResults:
def __init__(self, original_name):
self.original_name = original_name
self.predictions = list()
self.attention_paths = list()
def append_prediction(self, name, probability):
self.predictions.append({'name': name, 'probability': probability})
def append_attention_path(self, attention_score, token1, path, token2):
self.attention_paths.append({'score': attention_score,
'path': path,
'token1': token1,
'token2': token2})
This diff is collapsed. Click to expand it.
import traceback
from common import common
from py_extractor import PyExtractor
SHOW_TOP_CONTEXTS = 10
MAX_PATH_LENGTH = 8
MAX_PATH_WIDTH = 2
input_filename = 'test.c2v'
class InteractivePredictor:
exit_keywords = ['exit', 'quit', 'q']
def __init__(self, config, model):
model.predict([])
self.model = model
self.config = config
self.path_extractor = PyExtractor(config)
def predict(self):
print('Starting interactive prediction...')
while True:
print('Modify the file: "%s" and press any key when ready, or "q" / "quit" / "exit" to exit' % input_filename)
user_input = input()
if user_input.lower() in self.exit_keywords:
print('Exiting...')
return
try:
predict_lines, hash_to_string_dict = self.path_extractor.extract_paths(input_filename)
except ValueError as e:
print(e)
continue
raw_prediction_results = self.model.predict(predict_lines)
method_prediction_results = common.parse_prediction_results(
raw_prediction_results, hash_to_string_dict,
self.model.vocabs.target_vocab.special_words, topk=SHOW_TOP_CONTEXTS)
for raw_prediction, method_prediction in zip(raw_prediction_results, method_prediction_results):
print('Original name:\t' + method_prediction.original_name)
for name_prob_pair in method_prediction.predictions:
print('\t(%f) predicted: %s' % (name_prob_pair['probability'], name_prob_pair['name']))
print('Attention:')
for attention_obj in method_prediction.attention_paths:
print('%f\tcontext: %s,%s,%s' % (
attention_obj['score'], attention_obj['token1'], attention_obj['path'], attention_obj['token2']))
if self.config.EXPORT_CODE_VECTORS:
print('Code vector:')
print(' '.join(map(str, raw_prediction.code_vector)))
import numpy as np
import abc
import os
from typing import NamedTuple, Optional, List, Dict, Tuple, Iterable
from common import common
from vocabularies import Code2VecVocabs, VocabType
from config import Config
class ModelEvaluationResults(NamedTuple):
topk_acc: float
subtoken_precision: float
subtoken_recall: float
subtoken_f1: float
loss: Optional[float] = None
def __str__(self):
res_str = 'topk_acc: {topk_acc}, precision: {precision}, recall: {recall}, F1: {f1}'.format(
topk_acc=self.topk_acc,
precision=self.subtoken_precision,
recall=self.subtoken_recall,
f1=self.subtoken_f1)
if self.loss is not None:
res_str = ('loss: {}, '.format(self.loss)) + res_str
return res_str
class ModelPredictionResults(NamedTuple):
original_name: str
topk_predicted_words: np.ndarray
topk_predicted_words_scores: np.ndarray
attention_per_context: Dict[Tuple[str, str, str], float]
code_vector: Optional[np.ndarray] = None
class Code2VecModelBase(abc.ABC):
def __init__(self, config: Config):
self.config = config
self.config.verify()
self._log_creating_model()
if not config.RELEASE:
self._init_num_of_examples()
self._log_model_configuration()
self.vocabs = Code2VecVocabs(config)
self.vocabs.target_vocab.get_index_to_word_lookup_table()
self._load_or_create_inner_model()
self._initialize()
def _log_creating_model(self):
self.log('')
self.log('')
self.log('---------------------------------------------------------------------')
self.log('---------------------------------------------------------------------')
self.log('---------------------- Creating code2vec model ----------------------')
self.log('---------------------------------------------------------------------')
self.log('---------------------------------------------------------------------')
def _log_model_configuration(self):
self.log('---------------------------------------------------------------------')
self.log('----------------- Configuration - Hyper Parameters ------------------')
longest_param_name_len = max(len(param_name) for param_name, _ in self.config)
for param_name, param_val in self.config:
self.log('{name: <{name_len}}{val}'.format(
name=param_name, val=param_val, name_len=longest_param_name_len+2))
self.log('---------------------------------------------------------------------')
@property
def logger(self):
return self.config.get_logger()
def log(self, msg):
self.logger.info(msg)
def _init_num_of_examples(self):
self.log('Checking number of examples ...')
if self.config.is_training:
self.config.NUM_TRAIN_EXAMPLES = self._get_num_of_examples_for_dataset(self.config.train_data_path)
self.log(' Number of train examples: {}'.format(self.config.NUM_TRAIN_EXAMPLES))
if self.config.is_testing:
self.config.NUM_TEST_EXAMPLES = self._get_num_of_examples_for_dataset(self.config.TEST_DATA_PATH)
self.log(' Number of test examples: {}'.format(self.config.NUM_TEST_EXAMPLES))
@staticmethod
def _get_num_of_examples_for_dataset(dataset_path: str) -> int:
dataset_num_examples_file_path = dataset_path + '.num_examples'
if os.path.isfile(dataset_num_examples_file_path):
with open(dataset_num_examples_file_path, 'r') as file:
num_examples_in_dataset = int(file.readline())
else:
num_examples_in_dataset = common.count_lines_in_file(dataset_path)
with open(dataset_num_examples_file_path, 'w') as file:
file.write(str(num_examples_in_dataset))
return num_examples_in_dataset
def load_or_build(self):
self.vocabs = Code2VecVocabs(self.config)
self._load_or_create_inner_model()
def save(self, model_save_path=None):
if model_save_path is None:
model_save_path = self.config.MODEL_SAVE_PATH
model_save_dir = '/'.join(model_save_path.split('/')[:-1])
if not os.path.isdir(model_save_dir):
os.makedirs(model_save_dir, exist_ok=True)
self.vocabs.save(self.config.get_vocabularies_path_from_model_path(model_save_path))
self._save_inner_model(model_save_path)
def _write_code_vectors(self, file, code_vectors):
for vec in code_vectors:
file.write(' '.join(map(str, vec)) + '\n')
def _get_attention_weight_per_context(
self, path_source_strings: Iterable[str], path_strings: Iterable[str], path_target_strings: Iterable[str],
attention_weights: Iterable[float]) -> Dict[Tuple[str, str, str], float]:
attention_weights = np.squeeze(attention_weights, axis=-1) # (max_contexts, )
attention_per_context: Dict[Tuple[str, str, str], float] = {}
for path_source, path, path_target, weight in \
zip(path_source_strings, path_strings, path_target_strings, attention_weights):
string_context_triplet = (common.binary_to_string(path_source),
common.binary_to_string(path),
common.binary_to_string(path_target))
attention_per_context[string_context_triplet] = weight
return attention_per_context
def close_session(self):
pass
@abc.abstractmethod
def train(self):
...
@abc.abstractmethod
def evaluate(self) -> Optional[ModelEvaluationResults]:
...
@abc.abstractmethod
def predict(self, predict_data_lines: Iterable[str]) -> List[ModelPredictionResults]:
...
@abc.abstractmethod
def _save_inner_model(self, path):
...
def _load_or_create_inner_model(self):
if self.config.is_loading:
self._load_inner_model()
else:
self._create_inner_model()
@abc.abstractmethod
def _load_inner_model(self):
...
def _create_inner_model(self):
pass
def _initialize(self):
pass
@abc.abstractmethod
def _get_vocab_embedding_as_np_array(self, vocab_type: VocabType) -> np.ndarray:
...
def save_word2vec_format(self, dest_save_path: str, vocab_type: VocabType):
if vocab_type not in VocabType:
raise ValueError('`vocab_type` should be `VocabType.Token`, `VocabType.Target` or `VocabType.Path`.')
vocab_embedding_matrix = self._get_vocab_embedding_as_np_array(vocab_type)
index_to_word = self.vocabs.get(vocab_type).index_to_word
with open(dest_save_path, 'w') as words_file:
common.save_word2vec_file(words_file, index_to_word, vocab_embedding_matrix)
import tensorflow as tf
from typing import Dict, Tuple, NamedTuple, Union, Optional, Iterable
from config import Config
from vocabularies import Code2VecVocabs
import abc
from functools import reduce
from enum import Enum
class EstimatorAction(Enum):
Train = 'train'
Evaluate = 'evaluate'
Predict = 'predict'
@property
def is_train(self):
return self is EstimatorAction.Train
@property
def is_evaluate(self):
return self is EstimatorAction.Evaluate
@property
def is_predict(self):
return self is EstimatorAction.Predict
@property
def is_evaluate_or_predict(self):
return self.is_evaluate or self.is_predict
class ReaderInputTensors(NamedTuple):
path_source_token_indices: tf.Tensor
path_indices: tf.Tensor
path_target_token_indices: tf.Tensor
context_valid_mask: tf.Tensor
target_index: Optional[tf.Tensor] = None
target_string: Optional[tf.Tensor] = None
path_source_token_strings: Optional[tf.Tensor] = None
path_strings: Optional[tf.Tensor] = None
path_target_token_strings: Optional[tf.Tensor] = None
class ModelInputTensorsFormer(abc.ABC):
@abc.abstractmethod
def to_model_input_form(self, input_tensors: ReaderInputTensors):
...
@abc.abstractmethod
def from_model_input_form(self, input_row) -> ReaderInputTensors:
...
class PathContextReader:
def __init__(self,
vocabs: Code2VecVocabs,
config: Config,
model_input_tensors_former: ModelInputTensorsFormer,
estimator_action: EstimatorAction,
repeat_endlessly: bool = False):
self.vocabs = vocabs
self.config = config
self.model_input_tensors_former = model_input_tensors_former
self.estimator_action = estimator_action
self.repeat_endlessly = repeat_endlessly
self.CONTEXT_PADDING = ','.join([self.vocabs.token_vocab.special_words.PAD,
self.vocabs.path_vocab.special_words.PAD,
self.vocabs.token_vocab.special_words.PAD])
self.csv_record_defaults = [[self.vocabs.target_vocab.special_words.OOV]] + \
([[self.CONTEXT_PADDING]] * self.config.MAX_CONTEXTS)
self.create_needed_vocabs_lookup_tables(self.vocabs)
self._dataset: Optional[tf.data.Dataset] = None
@classmethod
def create_needed_vocabs_lookup_tables(cls, vocabs: Code2VecVocabs):
vocabs.token_vocab.get_word_to_index_lookup_table()
vocabs.path_vocab.get_word_to_index_lookup_table()
vocabs.target_vocab.get_word_to_index_lookup_table()
@tf.function
def process_input_row(self, row_placeholder):
parts = tf.io.decode_csv(
row_placeholder, record_defaults=self.csv_record_defaults, field_delim=' ', use_quote_delim=False)
tensors = self._map_raw_dataset_row_to_input_tensors(*parts)
tensors_expanded = ReaderInputTensors(
**{name: None if tensor is None else tf.expand_dims(tensor, axis=0)
for name, tensor in tensors._asdict().items()})
return self.model_input_tensors_former.to_model_input_form(tensors_expanded)
def process_and_iterate_input_from_data_lines(self, input_data_lines: Iterable) -> Iterable:
for data_row in input_data_lines:
processed_row = self.process_input_row(data_row)
yield processed_row
def get_dataset(self, input_data_rows: Optional = None) -> tf.data.Dataset:
if self._dataset is None:
self._dataset = self._create_dataset_pipeline(input_data_rows)
return self._dataset
def _create_dataset_pipeline(self, input_data_rows: Optional = None) -> tf.data.Dataset:
if input_data_rows is None:
assert not self.estimator_action.is_predict
dataset = tf.data.experimental.CsvDataset(
self.config.data_path(is_evaluating=self.estimator_action.is_evaluate),
record_defaults=self.csv_record_defaults, field_delim=' ', use_quote_delim=False,
buffer_size=self.config.CSV_BUFFER_SIZE)
else:
dataset = tf.data.Dataset.from_tensor_slices(input_data_rows)
dataset = dataset.map(
lambda input_line: tf.io.decode_csv(
tf.reshape(tf.cast(input_line, tf.string), ()),
record_defaults=self.csv_record_defaults,
field_delim=' ', use_quote_delim=False))
if self.repeat_endlessly:
dataset = dataset.repeat()
if self.estimator_action.is_train:
if not self.repeat_endlessly and self.config.NUM_TRAIN_EPOCHS > 1:
dataset = dataset.repeat(self.config.NUM_TRAIN_EPOCHS)
dataset = dataset.shuffle(self.config.SHUFFLE_BUFFER_SIZE, reshuffle_each_iteration=True)
dataset = dataset.map(self._map_raw_dataset_row_to_expected_model_input_form,
num_parallel_calls=self.config.READER_NUM_PARALLEL_BATCHES)
batch_size = self.config.batch_size(is_evaluating=self.estimator_action.is_evaluate)
if self.estimator_action.is_predict:
dataset = dataset.batch(1)
else:
dataset = dataset.filter(self._filter_input_rows)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(buffer_size=40)
return dataset
def _filter_input_rows(self, *row_parts) -> tf.bool:
row_parts = self.model_input_tensors_former.from_model_input_form(row_parts)
any_word_valid_mask_per_context_part = [
tf.not_equal(tf.reduce_max(row_parts.path_source_token_indices, axis=0),
self.vocabs.token_vocab.word_to_index[self.vocabs.token_vocab.special_words.PAD]),
tf.not_equal(tf.reduce_max(row_parts.path_target_token_indices, axis=0),
self.vocabs.token_vocab.word_to_index[self.vocabs.token_vocab.special_words.PAD]),
tf.not_equal(tf.reduce_max(row_parts.path_indices, axis=0),
self.vocabs.path_vocab.word_to_index[self.vocabs.path_vocab.special_words.PAD])]
any_contexts_is_valid = reduce(tf.logical_or, any_word_valid_mask_per_context_part)
if self.estimator_action.is_evaluate:
cond = any_contexts_is_valid
else:
word_is_valid = tf.greater(
row_parts.target_index, self.vocabs.target_vocab.word_to_index[self.vocabs.target_vocab.special_words.OOV]) # scalar
cond = tf.logical_and(word_is_valid, any_contexts_is_valid)
return cond
def _map_raw_dataset_row_to_expected_model_input_form(self, *row_parts) -> \
Tuple[Union[tf.Tensor, Tuple[tf.Tensor, ...], Dict[str, tf.Tensor]], ...]:
tensors = self._map_raw_dataset_row_to_input_tensors(*row_parts)
return self.model_input_tensors_former.to_model_input_form(tensors)
def _map_raw_dataset_row_to_input_tensors(self, *row_parts) -> ReaderInputTensors:
row_parts = list(row_parts)
target_str = row_parts[0]
target_index = self.vocabs.target_vocab.lookup_index(target_str)
contexts_str = tf.stack(row_parts[1:(self.config.MAX_CONTEXTS + 1)], axis=0)
split_contexts = tf.compat.v1.string_split(contexts_str, sep=',', skip_empty=False)
sparse_split_contexts = tf.sparse.SparseTensor(
indices=split_contexts.indices, values=split_contexts.values, dense_shape=[self.config.MAX_CONTEXTS, 3])
dense_split_contexts = tf.reshape(
tf.sparse.to_dense(sp_input=sparse_split_contexts, default_value=self.vocabs.token_vocab.special_words.PAD),
shape=[self.config.MAX_CONTEXTS, 3])
path_source_token_strings = tf.squeeze(
tf.slice(dense_split_contexts, begin=[0, 0], size=[self.config.MAX_CONTEXTS, 1]), axis=1)
path_strings = tf.squeeze(
tf.slice(dense_split_contexts, begin=[0, 1], size=[self.config.MAX_CONTEXTS, 1]), axis=1)
path_target_token_strings = tf.squeeze(
tf.slice(dense_split_contexts, begin=[0, 2], size=[self.config.MAX_CONTEXTS, 1]), axis=1)
path_source_token_indices = self.vocabs.token_vocab.lookup_index(path_source_token_strings)
path_indices = self.vocabs.path_vocab.lookup_index(path_strings)
path_target_token_indices = self.vocabs.token_vocab.lookup_index(path_target_token_strings)
valid_word_mask_per_context_part = [
tf.not_equal(path_source_token_indices, self.vocabs.token_vocab.word_to_index[self.vocabs.token_vocab.special_words.PAD]),
tf.not_equal(path_target_token_indices, self.vocabs.token_vocab.word_to_index[self.vocabs.token_vocab.special_words.PAD]),
tf.not_equal(path_indices, self.vocabs.path_vocab.word_to_index[self.vocabs.path_vocab.special_words.PAD])]
context_valid_mask = tf.cast(reduce(tf.logical_or, valid_word_mask_per_context_part), dtype=tf.float32)
return ReaderInputTensors(
path_source_token_indices=path_source_token_indices,
path_indices=path_indices,
path_target_token_indices=path_target_token_indices,
context_valid_mask=context_valid_mask,
target_index=target_index,
target_string=target_str,
path_source_token_strings=path_source_token_strings,
path_strings=path_strings,
path_target_token_strings=path_target_token_strings
)
import random
from argparse import ArgumentParser
import common
import pickle
def save_dictionaries(dataset_name, word_to_count, path_to_count, target_to_count,
num_training_examples):
save_dict_file_path = '{}.dict.c2v'.format(dataset_name)
with open(save_dict_file_path, 'wb') as file:
pickle.dump(word_to_count, file)
pickle.dump(path_to_count, file)
pickle.dump(target_to_count, file)
pickle.dump(num_training_examples, file)
print('Dictionaries saved to: {}'.format(save_dict_file_path))
def process_file(file_path, data_file_role, dataset_name, word_to_count, path_to_count, max_contexts):
sum_total = 0
sum_sampled = 0
total = 0
empty = 0
max_unfiltered = 0
output_path = '{}.{}.c2v'.format(dataset_name, data_file_role)
with open(output_path, 'w') as outfile:
with open(file_path, 'r') as file:
for line in file:
parts = line.rstrip('\n').split(' ')
target_name = parts[0]
contexts = parts[1:]
if len(contexts) > max_unfiltered:
max_unfiltered = len(contexts)
sum_total += len(contexts)
if len(contexts) > max_contexts:
context_parts = [c.split(',') for c in contexts]
full_found_contexts = [c for i, c in enumerate(contexts)
if context_full_found(context_parts[i], word_to_count, path_to_count)]
partial_found_contexts = [c for i, c in enumerate(contexts)
if context_partial_found(context_parts[i], word_to_count, path_to_count)
and not context_full_found(context_parts[i], word_to_count,
path_to_count)]
if len(full_found_contexts) > max_contexts:
contexts = random.sample(full_found_contexts, max_contexts)
elif len(full_found_contexts) <= max_contexts \
and len(full_found_contexts) + len(partial_found_contexts) > max_contexts:
contexts = full_found_contexts + \
random.sample(partial_found_contexts, max_contexts - len(full_found_contexts))
else:
contexts = full_found_contexts + partial_found_contexts
if len(contexts) == 0:
empty += 1
continue
sum_sampled += len(contexts)
csv_padding = " " * (max_contexts - len(contexts))
outfile.write(target_name + ' ' + " ".join(contexts) + csv_padding + '\n')
total += 1
print('File: ' + file_path)
print('Average total contexts: ' + str(float(sum_total) / total))
print('Average final (after sampling) contexts: ' + str(float(sum_sampled) / total))
print('Total examples: ' + str(total))
print('Empty examples: ' + str(empty))
print('Max number of contexts per word: ' + str(max_unfiltered))
return total
def context_full_found(context_parts, word_to_count, path_to_count):
return context_parts[0] in word_to_count \
and context_parts[1] in path_to_count and context_parts[2] in word_to_count
def context_partial_found(context_parts, word_to_count, path_to_count):
return context_parts[0] in word_to_count \
or context_parts[1] in path_to_count or context_parts[2] in word_to_count
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument("-trd", "--train_data", dest="train_data_path",
help="path to training data file", required=True)
parser.add_argument("-ted", "--test_data", dest="test_data_path",
help="path to test data file", required=True)
parser.add_argument("-vd", "--val_data", dest="val_data_path",
help="path to validation data file", required=True)
parser.add_argument("-mc", "--max_contexts", dest="max_contexts", default=200,
help="number of max contexts to keep", required=False)
parser.add_argument("-wvs", "--word_vocab_size", dest="word_vocab_size", default=1301136,
help="Max number of origin word in to keep in the vocabulary", required=False)
parser.add_argument("-pvs", "--path_vocab_size", dest="path_vocab_size", default=911417,
help="Max number of paths to keep in the vocabulary", required=False)
parser.add_argument("-tvs", "--target_vocab_size", dest="target_vocab_size", default=261245,
help="Max number of target words to keep in the vocabulary", required=False)
parser.add_argument("-wh", "--word_histogram", dest="word_histogram",
help="word histogram file", metavar="FILE", required=True)
parser.add_argument("-ph", "--path_histogram", dest="path_histogram",
help="path_histogram file", metavar="FILE", required=True)
parser.add_argument("-th", "--target_histogram", dest="target_histogram",
help="target histogram file", metavar="FILE", required=True)
parser.add_argument("-o", "--output_name", dest="output_name",
help="output name - the base name for the created dataset", metavar="FILE", required=True,
default='data')
args = parser.parse_args()
train_data_path = args.train_data_path
test_data_path = args.test_data_path
val_data_path = args.val_data_path
word_histogram_path = args.word_histogram
path_histogram_path = args.path_histogram
word_histogram_data = common.common.load_vocab_from_histogram(word_histogram_path, start_from=1,
max_size=int(args.word_vocab_size),
return_counts=True)
_, _, _, word_to_count = word_histogram_data
_, _, _, path_to_count = common.common.load_vocab_from_histogram(path_histogram_path, start_from=1,
max_size=int(args.path_vocab_size),
return_counts=True)
_, _, _, target_to_count = common.common.load_vocab_from_histogram(args.target_histogram, start_from=1,
max_size=int(args.target_vocab_size),
return_counts=True)
num_training_examples = 0
for data_file_path, data_role in zip([test_data_path, val_data_path, train_data_path], ['test', 'val', 'train']):
num_examples = process_file(file_path=data_file_path, data_file_role=data_role, dataset_name=args.output_name,
word_to_count=word_to_count, path_to_count=path_to_count,
max_contexts=int(args.max_contexts))
if data_role == 'train':
num_training_examples = num_examples
save_dictionaries(dataset_name=args.output_name, word_to_count=word_to_count,
path_to_count=path_to_count, target_to_count=target_to_count,
num_training_examples=num_training_examples)
TRAIN_DIR=dataset_train
VAL_DIR=dataset_val
TEST_DIR=dataset_test
DATASET_NAME=dataset
MAX_CONTEXTS=200
WORD_VOCAB_SIZE=1301136
PATH_VOCAB_SIZE=911417
TARGET_VOCAB_SIZE=261245
NUM_THREADS=64
PYTHON=python
###########################################################
TRAIN_DATA_PATH=data/path_contexts_train.csv
VAL_DATA_PATH=data/path_contexts_val.csv
TEST_DATA_PATH=data/path_contexts_test.csv
TRAIN_DATA_FILE=${TRAIN_DATA_PATH}
VAL_DATA_FILE=${VAL_DATA_PATH}
TEST_DATA_FILE=${TEST_DATA_PATH}
mkdir -p data
mkdir -p data/${DATASET_NAME}
TARGET_HISTOGRAM_FILE=data/${DATASET_NAME}/${DATASET_NAME}.histo.tgt.c2v
ORIGIN_HISTOGRAM_FILE=data/${DATASET_NAME}/${DATASET_NAME}.histo.ori.c2v
PATH_HISTOGRAM_FILE=data/${DATASET_NAME}/${DATASET_NAME}.histo.path.c2v
cat ${TRAIN_DATA_FILE} | cut -d' ' -f1 | awk '{n[$0]++} END {for (i in n) print i,n[i]}' > ${TARGET_HISTOGRAM_FILE}
cat ${TRAIN_DATA_FILE} | cut -d' ' -f2- | tr ' ' '\n' | cut -d',' -f1,3 | tr ',' '\n' | awk '{n[$0]++} END {for (i in n) print i,n[i]}' > ${ORIGIN_HISTOGRAM_FILE}
cat ${TRAIN_DATA_FILE} | cut -d' ' -f2- | tr ' ' '\n' | cut -d',' -f2 | awk '{n[$0]++} END {for (i in n) print i,n[i]}' > ${PATH_HISTOGRAM_FILE}
DIR=`dirname "$0"`
${PYTHON} ${DIR}/preprocess.py --train_data ${TRAIN_DATA_FILE} --test_data ${TEST_DATA_FILE} --val_data ${VAL_DATA_FILE} \
--max_contexts ${MAX_CONTEXTS} --word_vocab_size ${WORD_VOCAB_SIZE} --path_vocab_size ${PATH_VOCAB_SIZE} \
--target_vocab_size ${TARGET_VOCAB_SIZE} --word_histogram ${ORIGIN_HISTOGRAM_FILE} \
--path_histogram ${PATH_HISTOGRAM_FILE} --target_histogram ${TARGET_HISTOGRAM_FILE} --output_name data/${DATASET_NAME}/${DATASET_NAME}
rm ${TARGET_HISTOGRAM_FILE} ${ORIGIN_HISTOGRAM_FILE} ${PATH_HISTOGRAM_FILE}
import subprocess
class PyExtractor:
def __init__(self, config):
self.config = config
def read_file(self, input_filename):
with open(input_filename, 'r') as file:
return file.readlines()
def extract_paths(self, path):
output = self.read_file(path)
if len(output) == 0:
err = err.decode()
raise ValueError(err)
hash_to_string_dict = {}
result = []
for i, line in enumerate(output):
parts = line.rstrip().split(' ')
method_name = parts[0]
current_result_line_parts = [method_name]
contexts = parts[1:]
for context in contexts[:self.config.MAX_CONTEXTS]:
context_parts = context.split(',')
context_word1 = context_parts[0]
context_path = context_parts[1]
context_word2 = context_parts[2]
hashed_path = str(context_path)
hash_to_string_dict[hashed_path] = context_path
current_result_line_parts += ['%s,%s,%s' % (context_word1, hashed_path, context_word2)]
space_padding = ' ' * (self.config.MAX_CONTEXTS - len(contexts))
result_line = ' '.join(current_result_line_parts) + space_padding
result.append(result_line)
return result, hash_to_string_dict
This diff is collapsed. Click to expand it.
type=python
dataset_name=dataset
data_dir=../data/${dataset_name}
data=${data_dir}/${dataset_name}
test_data=${data_dir}/${dataset_name}.val.c2v
model_dir=models/${type}
mkdir -p ${model_dir}
set -e
python -u code2vec.py --data ${data} --save ${model_dir}/saved_model --test ${test_data}
This diff is collapsed. Click to expand it.
from github import Github
import time
import calendar
DATASET_MAX = 1000
class GithubCrawler:
def __init__(self, token):
self._token = token
self._g = Github(token)
def getTimeLimit(self):
core_rate_limit = self._g.get_rate_limit().core
reset_timestamp = calendar.timegm(core_rate_limit.reset.timetuple())
sleep_time = reset_timestamp - calendar.timegm(time.gmtime()) + 1
return sleep_time
def search_repo(self, keywords, S = 0, E = DATASET_MAX):
if type(keywords) == str:
keywords = [keywords] #auto packing for one keyword
query = '+'.join(keywords) + '+in:readme+in:description'
result = self._g.search_repositories(query)
ret = []
for i in range(S, E):
while True:
try:
r = result[i]
repoName = r.owner.login+'/'+r.name
print("repo found", f"[{i}]:", repoName)
ret.append(repoName)
break
except Exception:
print("Rate Limit Exceeded... Retrying", f"{[i]}", "Limit Time:", self.getTimeLimit())
time.sleep(1)
return ret
def search_files(self, repo_url, downloadLink = False):
while True:
try:
repo = self._g.get_repo(repo_url)
break
except Exception as e:
if '403' in str(e):
print("Rate Limit Exceeded... Retrying", f"{[i]}", "Limit Time:", self.getTimeLimit())
time.sleep(1)
continue
print(e)
return []
try:
contents = repo.get_contents("")
except Exception: #empty repo
return []
files = []
while contents:
file_content = contents.pop(0)
if file_content.type == 'dir':
if 'lib' in file_content.path: #python lib is in repo (too many files)
return []
contents.extend(repo.get_contents(file_content.path))
else:
if downloadLink:
files.append(file_content.download_url)
else:
files.append(file_content.path)
return files
\ No newline at end of file
import crawler
import os
import utils
TOKEN = 'YOUR_TOKEN_HERE'
DATASET_DIR = 'YOUR_PATH_HERE'
REPO_PATH = 'repos.txt'
utils.removeEmptyDirectories(DATASET_DIR)
c = crawler.GithubCrawler(TOKEN)
if not os.path.exists(REPO_PATH):
repos = c.search_repo('MNIST+language:python', 1000, 2000)
f = open(REPO_PATH, 'w')
for r in repos:
f.write(r + '\n')
f.close()
else:
f = open(REPO_PATH, 'r')
repos = f.readlines()
f.close()
S = 0
L = len(repos)
print("Found repositories:", L)
for i in range(S, L):
r = repos[i].strip()
savename = r.replace('/', '_')
print('Downloading', f'[{i}] :', savename)
if os.path.exists(os.path.join(DATASET_DIR, savename)):
continue
files = c.search_files(r, True)
files = list(filter(lambda x : utils.isformat(x, ['py', 'ipynb']), files))
if len(files) > 0:
utils.downloadFiles(DATASET_DIR, savename, files)
\ No newline at end of file
import os
from requests import get
def isformat(file, typenames):
if type(file) != str:
return False
if type(typenames) == str:
typenames = [typenames]
dot = file.rfind('.')
if dot < 0:
for t in typenames:
if file == t:
return True
return False
ext = file[dot + 1 :]
for t in typenames:
if ext == t:
return True
return False
def downloadFiles(root, dir, urls):
if not os.path.exists(root):
os.mkdir(root)
path = os.path.join(root, dir)
if not os.path.exists(path):
os.mkdir(path)
else:
return
for url in urls:
name = os.path.basename(url)
with open(os.path.join(path, name), 'wb') as f:
try:
response = get(url)
f.write(response.content)
except Exception as e:
print(e)
f.close()
break
f.close()
def removeEmptyDirectories(root):
cnt = 0
for dir in os.listdir(root):
d = os.path.join(root, dir)
if len(os.listdir(d)) == 0: #empty
os.rmdir(d)
cnt += 1
print(cnt, "empty directories removed")
\ No newline at end of file
class Block:
def __init__(self, type, line=''):
self.blocks = list()
self.code = line
self.blockType = type
self.indent = -1
def setIndent(self, indent):
self.indent = indent
def addLine(self, line):
if len(self.code) > 0:
self.code += '\n'
self.code += line
def addBlock(self, block):
self.blocks.append(block)
def debug(self):
if self.blockType != 'TYPE_NORMAL':
print("Block Info:", self.blockType, self.indent)
print(self.code)
for block in self.blocks:
if block.indent <= self.indent:
raise ValueError("Invalid Indent Error Occurred: {}, INDENT {} included in {}, INDENT {}".format(block.code, block.indent, self.code, self.indent))
block.debug()
def __str__(self):
if len(self.code) > 0:
result = self.code + '\n'
else:
result = ''
for block in self.blocks:
result += block.__str__()
return result
\ No newline at end of file
from utils import *
import file_parser
import random
def merge_two_files(input, output): # pick two random files from input, merge and shuffle codes, print to output
ori_files = [f for f in readdir(input) if is_extension(f, 'py')]
files = ori_files.copy()
random.shuffle(files)
os.makedirs(output, exist_ok=True) # create the output directory if not exists
log = open(os.path.join(output, 'log.txt'), 'w', encoding='utf8')
index = 1
while len(files) > 0:
if len(files) == 1:
one = random.choice(ori_files)
while one == files[0]: # why python doesn't have do while loop??
one = random.choice(ori_files)
pick = [files[0], one]
else:
pick = files[:2]
files = files[2:]
lines1 = read_file(pick[0])
lines2 = read_file(pick[1])
print("Merging:", pick[0], pick[1])
block1 = file_parser.parse_block(lines1)
block2 = file_parser.parse_block(lines2)
for b in block2.blocks:
block1.addBlock(b)
shuffle_block(block1)
write_block(os.path.join(output, '{}.py'.format(index)), block1)
log.write('{}.py {} {}\n'.format(index, pick[0], pick[1]))
index += 1
log.close()
print("Done generating Merged Dataset")
print("log.txt generated in output path, for merged file info. [merge_file_name file1 file2]")
'''
Usage: merge_two_files('data/original', 'data/merged')
'''
\ No newline at end of file
from utils import *
import file_parser
import re
# obfuscator v1 uses names from other methods (shuffles method names)
def detect_vars(line): # detect variables and return range tuples. except for keywords
ret = list()
s = 0
e = 0
detected = False
strException = False
strCh = None
line += ' ' # for last separator
for i in range(len(line)):
c = line[i]
if not strException and (c == "'" or c == '"'): # we cannot remove string first, because index gets changed
strCh = c
strException = True
continue
if strException:
if c == strCh:
strException = False
continue
if not detected and re.match('[A-Za-z_]', c):
detected = True
s = i
continue
if detected and not re.match('[A-Za-z_0-9]', c):
detected = False
e = i
ret.append((s, e))
return ret
def obfuscate(lines, vars, dictionary, mapper): # obfuscate the code
ret = list()
### write_file('D:/Develop/ori.py', lines)
for line in lines:
var_ranges = detect_vars(line)
var_ranges = [(s, e) for (s, e) in var_ranges if line[s:e] in vars] # remove keywords (do not convert to words because of string exception)
var_ranges.append((-1, -1)) # for out-of-range exception
var_index = 0
new_line = ''
i = 0
L = len(line)
while i < L:
if i == var_ranges[var_index][0]: # found var
s, e = var_ranges[var_index]
new_line += vars[mapper[dictionary[line[s:e]]]]
i = e
var_index += 1
else:
new_line += line[i]
i += 1
ret.append(new_line)
### write_file('D:/Develop/obf.py', ret)
return ret
def create_var_histogram(input, outPath):
files = [f for f in readdir(input) if is_extension(f, 'py')]
freq_dict = dict()
for p in files:
lines = read_file(p)
lines = remove_unnecessary_comments(lines)
for line in lines:
file_parser.parse_keywords(line, freq_dict)
hist = open(outPath, 'w', encoding='utf8')
arr = sorted(freq_dict.items(), key=select_value)
for i in arr:
hist.write(str(i) + '\n')
hist.close()
def read_histogram(inputPath):
lines = read_file(inputPath)
ret = []
for line in lines:
line = line.split("'")[1]
ret.append(line)
return ret
def obfuscate_files(input, output, var=None, threshold=4000): # obfuscate variables. Guessing variable names from keyword frequency (threshold) if variable list is not given
files = [f for f in readdir(input) if is_extension(f, 'py')]
freq_dict = dict()
codes = list()
for p in files:
lines = read_file(p)
lines = remove_unnecessary_comments(lines) # IMPORTANT: remove comments from lines for preprocessing
codes.append((p, lines))
if var == None:
for line in lines:
file_parser.parse_keywords(line, freq_dict)
if var == None: # don't have variable list
hist = open(os.path.join(output, 'log.txt'), 'w', encoding='utf8')
arr = sorted(freq_dict.items(), key=select_value)
for i in arr:
hist.write(str(i) + '\n')
hist.close()
var, _ = threshold_dict(freq_dict, threshold)
var = [v[0] for v in var]
dictionary = create_dictionary(var)
mapper = create_mapper(len(var))
### obfuscate(codes[0][1], var, dictionary, mapper)
for path, code in codes:
obfuscated = obfuscate(code, var, dictionary, mapper)
filepath = path.split(input)[1][1:]
os.makedirs(os.path.join(output, filepath.split('\\')[0]), exist_ok=True) # create the output directory if not exists
new_path = os.path.join(output, filepath)
write_file(new_path, obfuscated)
print("Done generating Obfuscated Dataset")
'''
Usage
obfuscate_files('data/original', 'data/obfuscated')
'''
\ No newline at end of file
from utils import *
import file_parser
import re
# obfuscator v2 generate random name for methods
def random_character(start=False):
if start:
x = random.randint(0, 52)
if x == 0:
return '_'
elif x <= 26:
return chr(65 + x - 1)
else:
return chr(97 + x - 27)
x = random.randint(0, 62)
if x == 0:
return '_'
elif x <= 26:
return chr(65 + x - 1)
elif x <= 52:
return chr(97 + x - 27)
else:
return str(x - 53)
def create_mapper_v2(L):
ret = []
while len(ret) < L:
length = random.randint(0, 8) + 4
s = random_character(True)
while len(s) < length:
s += random_character()
if not s in ret:
ret.append(s)
return ret
def detect_vars(line): # detect variables and return range tuples. except for keywords
ret = list()
s = 0
e = 0
detected = False
strException = False
strCh = None
line += ' ' # for last separator
for i in range(len(line)):
c = line[i]
if not strException and (c == "'" or c == '"'): # we cannot remove string first, because index gets changed
strCh = c
strException = True
continue
if strException:
if c == strCh:
strException = False
continue
if not detected and re.match('[A-Za-z_]', c):
detected = True
s = i
continue
if detected and not re.match('[A-Za-z_0-9]', c):
detected = False
e = i
ret.append((s, e))
return ret
def obfuscate(lines, vars, dictionary, mapper): # obfuscate the code
ret = list()
### write_file('D:/Develop/ori.py', lines)
for line in lines:
var_ranges = detect_vars(line)
var_ranges = [(s, e) for (s, e) in var_ranges if line[s:e] in vars] # remove keywords (do not convert to words because of string exception)
var_ranges.append((-1, -1)) # for out-of-range exception
var_index = 0
new_line = ''
i = 0
L = len(line)
while i < L:
if i == var_ranges[var_index][0]: # found var
s, e = var_ranges[var_index]
new_line += mapper[dictionary[line[s:e]]]
i = e
var_index += 1
else:
new_line += line[i]
i += 1
ret.append(new_line)
### write_file('D:/Develop/obf.py', ret)
return ret
def create_var_histogram(input, outPath):
files = [f for f in readdir(input) if is_extension(f, 'py')]
freq_dict = dict()
for p in files:
lines = read_file(p)
lines = remove_unnecessary_comments(lines)
for line in lines:
file_parser.parse_keywords(line, freq_dict)
hist = open(outPath, 'w', encoding='utf8')
arr = sorted(freq_dict.items(), key=select_value)
for i in arr:
hist.write(str(i) + '\n')
hist.close()
def read_histogram(inputPath):
lines = read_file(inputPath)
ret = []
for line in lines:
line = line.split("'")[1]
ret.append(line)
return ret
def obfuscate_files(input, output, var=None, threshold=4000): # obfuscate variables. Guessing variable names from keyword frequency (threshold) if variable list is not given
files = [f for f in readdir(input) if is_extension(f, 'py')]
freq_dict = dict()
codes = list()
for p in files:
lines = read_file(p)
lines = remove_unnecessary_comments(lines) # IMPORTANT: remove comments from lines for preprocessing
codes.append((p, lines))
if var == None:
for line in lines:
file_parser.parse_keywords(line, freq_dict)
if var == None: # don't have variable list
hist = open(os.path.join(output, 'log.txt'), 'w', encoding='utf8')
arr = sorted(freq_dict.items(), key=select_value)
for i in arr:
hist.write(str(i) + '\n')
hist.close()
var, _ = threshold_dict(freq_dict, threshold)
var = [v[0] for v in var]
dictionary = create_dictionary(var)
mapper = create_mapper_v2(len(var))
### obfuscate(codes[0][1], var, dictionary, mapper)
for path, code in codes:
obfuscated = obfuscate(code, var, dictionary, mapper)
filepath = path.split(input)[1][1:]
os.makedirs(os.path.join(output, filepath.split('\\')[0]), exist_ok=True) # create the output directory if not exists
new_path = os.path.join(output, filepath)
write_file(new_path, obfuscated)
print("Done generating Obfuscated Dataset")
'''
Usage
obfuscate_files('data/original', 'data/obfuscated')
'''
\ No newline at end of file
from utils import *
import file_parser
import random
def refine_files(input, output):
files = [f for f in readdir(input) if is_extension(f, 'py')]
random.shuffle(files)
for p in files:
lines = read_file(p)
print("Refining:", p)
block = file_parser.parse_block(lines)
filepath = p.split(input)[1][1:]
os.makedirs(os.path.join(output, filepath.split('\\')[0]), exist_ok=True) # create the output directory if not exists
path = os.path.join(output, filepath)
write_block(path, block)
print("Done generating Refined Dataset")
\ No newline at end of file
from utils import *
import file_parser
import random
def shuffle_files(input, output): # pick random file and shuffle code order to output
files = [f for f in readdir(input) if is_extension(f, 'py')]
random.shuffle(files)
for p in files:
lines = read_file(p)
print("Shuffling:", p)
block = file_parser.parse_block(lines)
shuffle_block(block)
filepath = p.split(input)[1][1:]
os.makedirs(os.path.join(output, filepath.split('\\')[0]), exist_ok=True) # create the output directory if not exists
path = os.path.join(output, filepath)
write_block(path, block)
print("Done generating Shuffled Dataset")
'''
shuffle_files('data/original', 'data/shuffled')
'''
\ No newline at end of file
from utils import *
import re
import keyword
'''
Test multi-line comments
'''
LIBRARYS = list()
def parse_keywords(line, out): # out : output dictionary to sum up frequencies
line = line.strip()
line = remove_string(line)
result = ''
for c in line:
if re.match('[A-Za-z_@0-9]', c):
result += c
else:
result += ' '
import_line = False
prev_key = ''
for key in result.split(' '):
if not key or is_number(key) or key[0] in "0123456789":
continue
## Exception code here
if key in ['from', 'import']:
import_line = True
if import_line and prev_key != 'as':
if not key in LIBRARYS:
LIBRARYS.append(key)
prev_key = key
continue
if key in keyword.kwlist or key in LIBRARYS or '@' in key:
prev_key = key
continue
prev_key = key
##
if not key in out:
out[key] = 1
else:
out[key] += 1
def parse_block(lines): # parse to import / def / class / normal (if, for, etc)
lines = remove_unnecessary_comments(lines)
root = Block('TYPE_ROOT') # main block tree node
block_stack = [root]
i = 0
L = len(lines)
# par_stack = list()
# multi_string_stack = list()
while i < L:
line = lines[i]
start_index = 0
indent_count = 0
while True: # count indents
if line[start_index] == '\t':
start_index += 1
indent_count += 4
elif line[start_index] == ' ':
start_index += 1
indent_count += 1
else:
break
block = create_block_from_line(line)
block.setIndent(indent_count)
if block.blockType == 'TYPE_FACTORY': # for @factory proeprty exception
i += 1
temp = create_block_from_line(lines[i])
if temp.blockType == 'TYPE_CLASS':
block.addLine(lines[i])
block.blockType = 'TYPE_CLASS'
elif temp.blockType == 'TYPE_DEF':
block.addLine(lines[i])
block.blockType = 'TYPE_DEF'
else: # unknown type exception (factory single lines, or multi line code)
i -= 1 # roll back
'''
### code for multi-line string/code detection, but too many exception. (most code works well due to indent parsing)
line = lines[i]
if detect_parenthesis(line, par_stack) or detect_multi_string(line, multi_string_stack) or detect_multi_line_code(lines[i]): # code is not ended in a single line
i += 1
while detect_parenthesis(lines[i], par_stack) or detect_multi_string(lines[i], multi_string_stack) or detect_multi_line_code(lines[i]):
block.addLine(lines[i])
i += 1
block.addLine(lines[i])
'''
if indent_count == block_stack[-1].indent: # same indent -> change the block
block_stack.pop()
block_stack[-1].addBlock(block)
block_stack.append(block)
elif indent_count > block_stack[-1].indent: # block included in previous block
block_stack[-1].addBlock(block)
block_stack.append(block)
else: # block ended
while indent_count <= block_stack[-1].indent:
block_stack.pop()
block_stack[-1].addBlock(block)
block_stack.append(block)
i += 1
return root
"""
Usage
path = 'data/test.py'
f = open(path, 'r')
lines = f.readlines()
f.close()
block = parse_block(lines)
block.debug()
'''
keywords = dict()
parse_keywords(lines, keywords)
for k, v in keywords.items():
print(k,':',v)
a, b = threshold_dict(keywords, 3)
print(a)
print(b)
'''
"""
'''
d = dict()
parse_keywords('from test.library import a as x, b as y', d)
print(d)
'''
\ No newline at end of file
from utils import remove_string
import utils
import data_merger
import data_refiner
import data_shuffler
import file_parser
import data_obfuscator_v2
if __name__ == '__main__':
input_path = 'data/original'
data_refiner.refine_files(input_path, 'data/refined')
data_merger.merge_two_files(input_path, 'data/merged')
data_shuffler.shuffle_files(input_path, 'data/shuffled')
vars = data_obfuscator_v2.read_histogram('data/histogram_v1.txt')
data_obfuscator_v2.obfuscate_files(input_path, 'data/obfuscated2', vars)
# utils.write_file('data/keyword_examples.txt', utils.search_keyword(input_path, 'rand'))
# data_obfuscator.create_var_histogram(input_path, 'data/histogram.txt')
from block import Block
import bisect
import os
import re
import random
TYPE_CLASS = ['class']
TYPE_DEF = ['def']
TYPE_IMPORT = ['from', 'import']
TYPE_CONDITOIN = ['if', 'elif', 'else', 'for', 'while', 'with']
multi_line_comments = ["'''", '"""']
def select_value(x):
return x[1]
def threshold_dict(d, val): # split dict in two by thesholding value
arr = sorted(d.items(), key=select_value)
index = bisect.bisect_left([r[1] for r in arr], val)
return arr[:index], arr[index:]
def is_number(s):
if s[0] == '-':
s = s[1:]
return s.replace('.','',1).isdigit()
def is_extension(f, ext):
return os.path.splitext(f)[1][1:] == ext
def _readdir_r(dirpath): # readdir for recursive
ret = []
for f in os.listdir(dirpath):
ret.append(os.path.join(dirpath, f))
return ret
def readdir(path): # read files from the directory
pathList = [path]
result = []
i = 0
while i < len(pathList):
f = pathList[i]
if os.path.isdir(f):
pathList += _readdir_r(f)
else:
result.append(f)
i += 1
return result
def remove_string(line):
strIn = False
strCh = None
result = ''
i = 0
L = len(line)
while i < L:
if i + 3 < L:
if line[i:i+3] in multi_line_comments:
if not strIn:
strIn = True
strCh = line[i:i+3]
elif line[i:i+3] == strCh:
strIn = False
i += 2
continue
c = line[i]
i += 1
if c == '\'' or c == '\"':
if not strIn:
strIn = True
strCh = c
elif c == strCh:
strIn = False
continue
if strIn:
continue
result += c
return result
def using_multi_string(line, index):
line = line.strip()
for comment in multi_line_comments:
if line.find(comment, index) > 0:
return True
return False
def remove_unnecessary_comments(lines):
# Warning : cannot detect all multi-line comments, because it exactly is multi-line string.
#TODO: multi line string parser will not work well when using strings (and comments, also) more than one.
# ex) a = ''' d ''' + '''
# abc ''' + '''
# x'''
result = []
multi_line = False
multi_string = False
strCh = None
for line in lines:
find_str_index = 0
if multi_string:
if strCh in line:
find_str_index = line.find(strCh) + 3
multi_string = False
strCh = None
result.append(line)
continue
if multi_line: # parsing multi-line comments
if strCh in line:
multi_line = False
strCh = None
continue
if using_multi_string(line, find_str_index):
i1 = line.find(multi_line_comments[0])
i2 = line.find(multi_line_comments[1])
if i1 < 0:
i1 = len(line) + 1
if i2 < 0:
i2 = len(line) + 1
if i1 < i2:
strCh = multi_line_comments[0]
else:
strCh = multi_line_comments[1]
result.append(line)
if line.count(strCh) % 2 != 0:
multi_string = True
continue
code = line.strip()
if code[:3] in multi_line_comments: # detect in-out of multi-line comments
if code.count(code[:3]) % 2 != 0: # comment count in odd numbers (start or end of comment is in the line)
multi_line = True
strCh = code[:3]
continue
comment_index = line.find('#')
if comment_index >= 0: # one line comment found
line = line[:comment_index]
line = line.rstrip() # remove rightmost spaces
if len(line) == 0: # no code in this line
continue
result.append(line) # add to results
return result
def create_block_from_line(line):
_line = remove_string(line)
_line = _line.strip()
if '@' in _line:
return Block('TYPE_FACTORY', line)
keywords = _line.split(' ')
for key in keywords:
if key in TYPE_IMPORT:
return Block('TYPE_IMPORT', line)
if key in TYPE_CLASS:
return Block('TYPE_CLASS', line)
if key in TYPE_DEF:
return Block('TYPE_DEF', line)
if key in TYPE_CONDITOIN:
return Block('TYPE_CONDITION', line)
return Block('TYPE_NORMAL', line)
def create_dictionary(arr): # create index dictionary for str array
ret = dict()
key = 0
for name in arr:
ret[name] = key
key += 1
return ret
def create_mapper(L): # create mapping array to match each index in range L
arr = list(range(L))
random.shuffle(arr)
ret = arr.copy()
for i in range(L):
ret[i] = arr[i]
return ret
def read_file(path):
f = open(path, 'r', encoding='utf8')
ret = f.readlines()
f.close()
return ret
def write_file(path, lines):
f = open(path, 'w', encoding='utf8')
for line in lines:
if '\n' in line:
f.write(line)
else:
f.write(line + '\n')
f.close()
def write_block(path, block):
f = open(path, 'w', encoding='utf8')
f.write(str(block))
f.close()
def shuffle_block(block):
if block.blockType != 'TYPE_CLASS' and block.blockType != 'TYPE_ROOT':
return
for b in block.blocks:
shuffle_block(b)
random.shuffle(block.blocks)
def detect_multi_string(line, stack):
L = len(line)
for i in range(L):
if i + 3 > L:
break
s = line[i:i+3]
if s in multi_line_comments:
if len(stack) > 0 and stack[-1] == s:
stack.pop()
elif len(stack) == 0:
stack.append(s)
return len(stack) > 0
def detect_parenthesis(line, stack):
line = remove_string(line)
for c in line:
if c == '(':
stack.append(1)
elif c == ')':
stack.pop()
if len(stack) > 0:
print(line)
return len(stack) > 0
def detect_multi_line_code(line):
line = line.rstrip()
return len(line) > 0 and line[-1] == '\\'
def search_keyword(path, keyword, fast_detect=False): # detect just key string is included in the line if fast_detect is True
files = [f for f in readdir(path) if is_extension(f, 'py')]
result = list()
for p in files:
lines = read_file(p)
lines = remove_unnecessary_comments(lines)
for line in lines:
if fast_detect:
if keyword in line:
result.append(line)
continue
x = ''
for c in line:
if re.match('[A-Za-z_@0-9]', c):
x += c
else:
x += ' '
keywords = x.split(' ')
if keyword in keywords:
result.append(line)
return result
\ No newline at end of file
import os
MAX_SEQ_LENGTH = 384
BATCH_SIZE = 64
EPOCHS = 50
BASE_OUTPUT = "output/siamese"
DATASET_PATH = "data/pair_dataset.npz" #path for generated pair dataset
VECTOR_PATH = "data/vectors.npz" #path for feature vectors from code dataset
EMBEDDING_PATH = "data/embedding.npz" #path for embedding vector
MODEL_PATH = os.path.sep.join([BASE_OUTPUT, "siamese_model"])
PLOT_PATH = os.path.sep.join([BASE_OUTPUT, "plot.png"])
\ No newline at end of file
import numpy as np
import random
import pandas as pd
from keras.preprocessing.text import Tokenizer
from utils import *
def save_dataset(path, pairData, pairLabels, compressed=True):
if compressed:
np.savez_compressed(path, pairData=pairData, pairLabels=pairLabels)
else:
np.savez(path, pairData=pairData, pairLabels=pairLabels)
def load_dataset(path):
data = np.load(path, allow_pickle=True)
return (data['pairData'], data['pairLabels'])
def make_dataset_small(path): # couldn't make dataser for shuffled/merged/obfuscated, as memory run out.
vecs = np.load(path, allow_pickle=True)['vecs']
pairData = []
pairLabels = [] # 1 for plagiarism
# original pair
for i in range(len(vecs)):
currentData = vecs[i]
pairData.append([currentData, currentData])
pairLabels.append([1])
j = i
while j == i:
j = random.randint(0, len(vecs) - 1)
pairData.append([currentData, vecs[j]])
pairLabels.append([0])
return (np.array(pairData), np.array(pairLabels))
def load_embedding(path):
data = np.load(path, allow_pickle=True)
return (data['vocab_size'], data['embedding_matrix'])
\ No newline at end of file
import re
from utils import remove_string
def parse_keywords(line):
line = line.strip()
line = remove_string(line)
result = ''
for c in line:
if re.match('[A-Za-z_@0-9]', c):
result += c
else:
result += ' '
return result.split(' ')
\ No newline at end of file
from tensorflow.python.keras import backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Layer
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Embedding
from tensorflow.python.keras.layers.wrappers import Bidirectional
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
class ManDist(Layer):
def __init__(self, **kwargs):
self.result = None
super(ManDist, self).__init__(**kwargs)
def build(self, input_shape):
super(ManDist, self).build(input_shape)
def call(self, x, **kwargs):
self.result = K.exp(-K.sum(K.abs(x[0] - x[1]), axis=1, keepdims=True))
return self.result
def compute_output_shape(self):
return K.int_shape(self.result)
def build_siamese_model(embedding_matrix, embeddingDim, max_sequence_length=384, number_lstm_units=50, rate_drop_lstm=0.01):
x = Sequential()
x.add(Embedding(len(embedding_matrix), embeddingDim, weights=[embedding_matrix], input_shape=(max_sequence_length,), trainable=False))
x.add(LSTM(number_lstm_units, dropout=rate_drop_lstm, return_sequences=True, activation='softmax'))
input_1 = Input(shape=(max_sequence_length,), dtype='int32')
input_2 = Input(shape=(max_sequence_length,), dtype='int32')
distance = ManDist()([x(input_1), x(input_2)])
model = Model(inputs=[input_1, input_2], outputs=[distance])
model.compile(loss='mean_squared_error', optimizer=Adam(learning_rate=0.001), metrics=['accuracy'])
return model
\ No newline at end of file
import config
from tensorflow.keras.models import load_model
from gensim.models import KeyedVectors
from file_parser import parse_keywords
import tensorflow as tf
from utils import *
import random
import numpy as np
def avg_feature_vector(text, model, num_features, index2word_set):
words = parse_keywords(text)
feature_vec = np.zeros((num_features,), dtype='float32')
n_words = 0
for word in words:
if word in index2word_set:
n_words += 1
feature_vec = np.add(feature_vec, model[word])
if (n_words > 0):
feature_vec = np.divide(feature_vec, n_words)
return feature_vec
def compare(c2v_model, model, dir1, dir2):
files = [f for f in readdir(dir1) if is_extension(f, 'py')]
plt.ylabel('cos_sim')
m = 10
Mx = 0
idx = 0
L = len(files)
data = []
index2word_set = set(c2v_model.index_to_key)
for f in files:
print(idx,"/",L)
f2 = dir2 + f.split(dir1)[1]
text1 = readAll(f)
text2 = readAll(f2)
input1 = avg_feature_vector(text1, c2v_model, 384, index2word_set)
input2 = avg_feature_vector(text2, c2v_model, 384, index2word_set)
data.append([[input1], [input2]])
idx += 1
result = model.predict(data)
print(result)
vectors_text_path = 'data/targets.txt'
c2v_model = KeyedVectors.load_word2vec_format(vectors_text_path, binary=False)
model = load_model(config.MODEL_PATH)
# Usage
# compare(c2v_model, model, 'data/refined', 'data/shuffled')
\ No newline at end of file
import config
from dataset import load_dataset
from tensorflow.keras.models import load_model
import tensorflow as tf
pairData, pairLabels = load_dataset(config.DATASET_PATH)
print("Loaded Dataset")
X1 = pairData[:, 0].tolist()
X2 = pairData[:, 1].tolist()
Label = pairLabels[:].tolist()
X1 = tf.convert_to_tensor(X1)
X2 = tf.convert_to_tensor(X2)
Label = tf.convert_to_tensor(Label)
model = load_model(config.MODEL_PATH)
result = model.evaluate([X1, X2], Label, batch_size=64)
print("test loss, test acc:", result)
\ No newline at end of file
from tokenize import Token
from utils import plot_training
import config
import os
import numpy as np
import random
import tensorflow as tf
from dataset import load_dataset, load_embedding, make_dataset_small_v2, save_dataset
from model import build_siamese_model
from tensorflow.keras.models import load_model
from tensorflow.keras.callbacks import Callback
# load dataset
if os.path.exists(config.DATASET_PATH):
pairData, pairLabels = load_dataset(config.DATASET_PATH)
print("Loaded Dataset")
else:
print("Generating Dataset...")
pairData, pairLabels = make_dataset_small(config.VECTOR_PATH)
save_dataset(config.DATASET_PATH, pairData, pairLabels)
print("Saved Dataset")
# build model
if not os.path.exists(config.MODEL_PATH):
print("Loading Embedding Vectors...")
vocab_size, embedding_matrix = load_embedding(config.EMBEDDING_PATH)
print("Building Models...")
model = build_siamese_model(embedding_matrix, 384)
else:
model = load_model(config.MODEL_PATH)
# train model
X1 = pairData[:, 0].tolist()
X2 = pairData[:, 1].tolist()
Label = pairLabels[:].tolist()
X1 = tf.convert_to_tensor(X1)
X2 = tf.convert_to_tensor(X2)
Label = tf.convert_to_tensor(Label)
Length = int(len(X1) * 0.7)
trainX1, testX1 = X1[:Length], X1[-Length:]
trainX2, testX2 = X2[:Length], X2[-Length:]
trainY, testY = Label[:Length], Label[-Length:]
print("Training Model...")
history = model.fit([trainX1, trainX2], trainY, batch_size=config.BATCH_SIZE, epochs=config.EPOCHS,
validation_data=([testX1, testX2], testY))
print("Saving Model...")
model.save(config.MODEL_PATH)
print("Saved Model")
plot_training(history, config.PLOT_PATH)
\ No newline at end of file
import os
import re
import matplotlib.pyplot as plt
multi_line_comments = ["'''", '"""']
def remove_string(line):
strIn = False
strCh = None
result = ''
i = 0
L = len(line)
while i < L:
if i + 3 < L:
if line[i:i+3] in multi_line_comments:
if not strIn:
strIn = True
strCh = line[i:i+3]
elif line[i:i+3] == strCh:
strIn = False
i += 2
continue
c = line[i]
i += 1
if c == '\'' or c == '\"':
if not strIn:
strIn = True
strCh = c
elif c == strCh:
strIn = False
continue
if strIn:
continue
result += c
return result
def is_extension(f, ext):
return os.path.splitext(f)[1][1:] == ext
def _readdir_r(dirpath): # readdir for recursive
ret = []
for f in os.listdir(dirpath):
ret.append(os.path.join(dirpath, f))
return ret
def readdir(path): # read files from the directory
pathList = [path]
result = []
i = 0
while i < len(pathList):
f = pathList[i]
if os.path.isdir(f):
pathList += _readdir_r(f)
else:
result.append(f)
i += 1
return result
def readAll(path):
f = open(path, 'r', encoding='utf8')
ret = f.read()
f.close()
return ret
def readLines(path):
f = open(path, 'r', encoding='utf8')
ret = f.readlines()
f.close()
return ret
def plot_training(H, plotPath):
plt.style.use("ggplot")
plt.figure()
plt.plot(H.history["loss"], label="train_loss")
plt.plot(H.history["val_loss"], label="val_loss")
plt.plot(H.history["accuracy"], label="train_acc")
plt.plot(H.history["val_accuracy"], label="val_acc")
plt.title("Training Loss and Accuracy")
plt.xlabel("Epoch #")
plt.ylabel("Loss/Accuracy")
plt.legend(loc="lower left")
plt.savefig(plotPath)
\ No newline at end of file
from gensim.models import KeyedVectors
import text2vec
import random
from utils import *
import matplotlib.pyplot as plt
vectors_text_path = 'data/targets.txt' # w2v output file from model
model = KeyedVectors.load_word2vec_format(vectors_text_path, binary=False)
def compare(dir1, dir2):
files = [f for f in readdir(dir1) if is_extension(f, 'py')]
plt.ylabel('cos_sim')
m = 10
Mx = 0
idx = 0
L = len(files)
for f in files:
print(idx,"/",L)
f2 = dir2 + f.split(dir1)[1]
text1 = readAll(f)
text2 = readAll(f2)
similarity = text2vec.get_similarity(text1, text2, model, 384)
m = min(m, similarity)
Mx = max(Mx, similarity)
plt.plot(idx, similarity, 'r.')
idx += 1
print("min:", m, "max:", Mx)
plt.show()
def compare2(path): # for merged dataset
pairs = read_file(path + '/log.txt') # log file format: path_merged path_source1 path_source2
plt.ylabel('cos_sim')
m = 10
Mx = 0
idx = 0
L = len(pairs)
s1 = []
s2 = []
for p in pairs:
print(idx,"/",L)
arr = p.split(' ')
C = path + '/' + arr[0].strip()
A = arr[1].strip()
B = arr[2].strip()
text_A = readAll(A)
text_B = readAll(B)
text_C = readAll(C)
similarity = text2vec.get_similarity(text_A, text_C, model, 384)
m = min(m, similarity)
Mx = max(Mx, similarity)
s1.append(similarity)
similarity = text2vec.get_similarity(text_B, text_C, model, 384)
m = min(m, similarity)
Mx = max(Mx, similarity)
s2.append(similarity)
idx += 1
print("min:", m, "max:", Mx)
plt.plot(s1, 'r.')
plt.waitforbuttonpress()
plt.cla()
plt.plot(s2, 'b.')
plt.show()
def compare3(dir): # for original dataset compare. (n^2 here. beware of long processing
files = [f for f in readdir(dir) if is_extension(f, 'py')]
plt.ylabel('cos_sim')
m = 10
Mx = 0
idx = 0
L = len(files)
data = []
for f in files:
print(idx,"/",L)
text = readAll(f)
data.append(text)
idx += 1
for i in range(L):
print(i)
j = i
if i == 0:
continue
while j == i:
j = random.choice(list(range(i)))
similarity = text2vec.get_similarity(data[i], data[j], model, 384)
m = min(m, similarity)
Mx = max(Mx, similarity)
plt.plot(i, similarity, 'r.')
print("min:", m, "max:", Mx)
plt.show()
# Usage
# compare('data/refined', 'data/obfuscated2')
# compare2('data/merged')
# compare3('data/refined')
\ No newline at end of file
import re
from utils import remove_string
def parse_keywords(line):
line = line.strip()
line = remove_string(line)
result = ''
for c in line:
if re.match('[A-Za-z_@0-9]', c):
result += c
else:
result += ' '
return result.split(' ')
\ No newline at end of file
from file_parser import parse_keywords
import numpy as np
from scipy import spatial
def avg_feature_vector(text, model, num_features, index2word_set):
words = parse_keywords(text)
feature_vec = np.zeros((num_features, ), dtype='float32')
n_words = 0
for word in words:
if word in index2word_set:
n_words += 1
feature_vec = np.add(feature_vec, model[word])
if (n_words > 0):
feature_vec = np.divide(feature_vec, n_words)
return feature_vec
def get_similarity(text1, text2, model, num_features):
index2word_set = set(model.index_to_key)
s1 = avg_feature_vector(text1, model, num_features, index2word_set)
s2 = avg_feature_vector(text2, model, num_features, index2word_set)
return abs(1 - spatial.distance.cosine(s1, s2))
\ No newline at end of file
import os
multi_line_comments = ["'''", '"""']
def remove_string(line):
strIn = False
strCh = None
result = ''
i = 0
L = len(line)
while i < L:
if i + 3 < L:
if line[i:i+3] in multi_line_comments:
if not strIn:
strIn = True
strCh = line[i:i+3]
elif line[i:i+3] == strCh:
strIn = False
i += 2
continue
c = line[i]
i += 1
if c == '\'' or c == '\"':
if not strIn:
strIn = True
strCh = c
elif c == strCh:
strIn = False
continue
if strIn:
continue
result += c
return result
def using_multi_string(line, index):
line = line.strip()
for comment in multi_line_comments:
if line.find(comment, index) > 0:
return True
return False
def remove_unnecessary_comments(lines):
# Warning : cannot detect all multi-line comments, because it exactly is multi-line string.
#TODO: multi line string parser will not work well when using strings (and comments, also) more than one.
# ex) a = ''' d ''' + '''
# abc ''' + '''
# x'''
result = []
multi_line = False
multi_string = False
strCh = None
for line in lines:
find_str_index = 0
if multi_string:
if strCh in line:
find_str_index = line.find(strCh) + 3
multi_string = False
strCh = None
result.append(line)
continue
if multi_line: # parsing multi-line comments
if strCh in line:
multi_line = False
strCh = None
continue
if using_multi_string(line, find_str_index):
i1 = line.find(multi_line_comments[0])
i2 = line.find(multi_line_comments[1])
if i1 < 0:
i1 = len(line) + 1
if i2 < 0:
i2 = len(line) + 1
if i1 < i2:
strCh = multi_line_comments[0]
else:
strCh = multi_line_comments[1]
result.append(line)
if line.count(strCh) % 2 != 0:
multi_string = True
continue
code = line.strip()
if code[:3] in multi_line_comments: # detect in-out of multi-line comments
if code.count(code[:3]) % 2 != 0: # comment count in odd numbers (start or end of comment is in the line)
multi_line = True
strCh = code[:3]
continue
comment_index = line.find('#')
if comment_index >= 0: # one line comment found
line = line[:comment_index]
line = line.rstrip() # remove rightmost spaces
if len(line) == 0: # no code in this line
continue
result.append(line) # add to results
return result
def is_extension(f, ext):
return os.path.splitext(f)[1][1:] == ext
def _readdir_r(dirpath): # readdir for recursive
ret = []
for f in os.listdir(dirpath):
ret.append(os.path.join(dirpath, f))
return ret
def readdir(path): # read files from the directory
pathList = [path]
result = []
i = 0
while i < len(pathList):
f = pathList[i]
if os.path.isdir(f):
pathList += _readdir_r(f)
else:
result.append(f)
i += 1
return result
def read_file(path):
f = open(path, 'r', encoding='utf8')
ret = f.readlines()
f.close()
return ret
def write_file(path, lines):
f = open(path, 'w', encoding='utf8')
for line in lines:
if '\n' in line:
f.write(line)
else:
f.write(line + '\n')
f.close()
def readAll(path):
f = open(path, 'r', encoding='utf8')
ret = f.read()
f.close()
return ret
\ No newline at end of file
No preview for this file type