Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- data = open('data_sdpa_fp16.txt', 'r').read()
- data = data.strip().split('\n')
- data = [i for i in data if "skipping" not in i]
- x = data[::2]
- y = data[1::2]
- x = x[:len(y)]
- x = [[int(i) for i in i.split(', ')] for i in x]
- # Filter out cases where perf difference is insignificant
- nx = []
- ny = []
- for xi, yi in zip(x, y):
- yi = yi.split(', ')
- if float(yi[1]) > 1.1 or float(yi[1]) < 0.9:
- nx.append(xi)
- ny.append(yi[0])
- x, y = nx, ny
- from sklearn.preprocessing import PolynomialFeatures
- feature_names = ['batch_size', 'n_heads', 'seq_len', 'head_dim']
- def add_inverse(x, feature_names):
- for i in x:
- for j in range(len(i)):
- i.append(1/i[j])
- feature_names += [f"1/{i}" for i in feature_names]
- return x, feature_names
- def add_polynomial(x, feature_names, degree=2):
- poly = PolynomialFeatures(degree=degree, interaction_only=True)
- x = poly.fit_transform(x)
- feature_names = list(poly.get_feature_names_out(input_features=feature_names))
- return x, feature_names
- x_aug, feature_names = add_inverse(x, feature_names)
- x_aug, feature_names = add_polynomial(x_aug, feature_names, degree=3)
- y = [i == "FLASH" for i in y]
- from sklearn.model_selection import train_test_split
- from sklearn.model_selection import train_test_split
- from sklearn.tree import DecisionTreeClassifier, export_text
- from sklearn.metrics import accuracy_score
- X_train, X_val, y_train, y_val = train_test_split(
- x_aug, y, test_size=0.3, random_state=42)
- param_grid = {
- 'max_depth': [1],
- 'min_samples_split': [2, 3, 4],
- 'min_samples_leaf': [1, 2, 3],
- }
- def cur_predict(x):
- batch_size, num_heads, query_lengths, head_dim, *_ = x
- threads_flash = batch_size * num_heads
- threads_cutlass = threads_flash * (query_lengths // 64)
- more_threads_cutlass = (threads_cutlass // 2) >= threads_flash
- small_threads_flash = threads_flash < 60
- large_head_dim = head_dim == 128
- is_flash = (small_threads_flash and more_threads_cutlass) or large_head_dim
- return is_flash == y
- def eval_f(f, X, Y):
- y_pred = []
- for x, y in zip(X, Y):
- y_pred.append(f(x))
- return accuracy_score(Y, y_pred)
- results = []
- # Iterate over all combinations of hyperparameters and train a decision tree classifier
- for max_depth in param_grid['max_depth']:
- for min_samples_split in param_grid['min_samples_split']:
- for min_samples_leaf in param_grid['min_samples_leaf']:
- # Train a decision tree classifier with the current hyperparameters
- dt = DecisionTreeClassifier(max_depth=max_depth,
- min_samples_split=min_samples_split,
- min_samples_leaf=min_samples_leaf,
- random_state=42)
- dt.fit(X_train, y_train)
- # Evaluate the accuracy on the validation set
- y_pred = dt.predict(X_val)
- acc = accuracy_score(y_val, y_pred)
- # Append the results to the list
- results.append({
- 'max_depth': max_depth,
- 'min_samples_split': min_samples_split,
- 'min_samples_leaf': min_samples_leaf,
- 'accuracy': acc,
- 'classifier': dt,
- })
- # Sort the results by accuracy in descending order
- results = sorted(results, key=lambda x: x['accuracy'], reverse=True)
- # Print the results
- for result in results:
- print(f"max_depth={result['max_depth']}, min_samples_split={result['min_samples_split']}, min_samples_leaf={result['min_samples_leaf']}, accuracy={result['accuracy']}")
- print(export_text(results[0]['classifier'], feature_names=feature_names))
- # print(export_text(results[0]['classifier'], feature_names=['batch_size', 'n_heads', 'seq_len', 'head_dim', 'flash_threads', 'seq_len / flash_threads']))
- print(eval_f(cur_predict, x, y))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement