The Fittest Mario

An evolutionary computation approach to Super Mario Bros

This project was a collaboration with Pooneh Safayenikoo, Pedro Carrion Castagnola, Marshall Lindsay, Jeffrey Ruffolo at the University of Missouri. My role involved developing a generic state representation and optimization framework for the game that could be extended via evolutionary computation techniques. My teammates ran experiments with with specific aspects of the solver, including the fitness function, parent selection, mutations, and crossover methods.

Here is a demo video I made about the system (animations courtesy of Jeffrey Ruffolo), tailored to high school science students.

Super Mario Bros is a classic platformer video game in which the player tries to navigate Mario to the flag at the end of the stage. During this time, the player must successfully overcome obstacles and terrains in order to progress. Our project implements an evolutionary computation approach to finding a sequence of player actions to beat the game. We experiment with several operators for parent selection, crossover, mutation, and fitness evaluation. Results show successful convergence towards winning action sequences, with convergence rate responding to changes in strategies. We also observed different gameplay behaviors for chromosomes evaluated using different fitness functions.

Chromosome representation

The chromosome is represented as a sequence of game actions taken by the player (such as up, left, or right), which when emulated result in a fitness score for the given chromosome. We also used the index of game over for crossover and mutation methods because after this index the actions do not influence on the fitness. The chromosome in our implementation is an object that contains: a list of action sequences (integers), fitness score and index of game over. Figure 1 shows a representation of the chromosome.

Figure 1.

Simulation environment

We used gym-super-mario-bros, an OpenAI Gym environment for Super Mario Bros on The Nintendo Entertainment System (NES) using the nes-py emulator. This environment provides an easy-to-use, high-level API for making commands in Super Mario Bros. Figure 2 shows the general pipeline of our experiments.

Figure 2.

To implement this experiment pipeline, I made a simple boilerplate Python wrapper, MSBGeneticOptimizerEnv, that maintains the chomosome state and runs chromosome selection, crossover, and mutation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
from __future__ import print_function

import multiprocessing, time, functools, gym_super_mario_bros, os, csv

from multiprocessing import Pool
from contextlib import contextmanager
from itertools import product
from nes_py.wrappers import BinarySpaceToDiscreteSpaceEnv
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT, COMPLEX_MOVEMENT, RIGHT_ONLY
from abc import ABCMeta, abstractmethod

import pandas as pd
import numpy as np

try:
   import cPickle as pickle
except:
   import pickle


#MSBGeneticOptimizerEnv contains boilerplate code for the project
class MSBGeneticOptimizerEnv(object):
   """An environment wrapper for genetically optimizing mario smash brothers simulation ."""

   def __init__(self, max_steps=3000, num_chromosomes=40, action_encoding=RIGHT_ONLY, render=False, fitness_strategy="x_pos", session_file="", world=1, stage=1, version=0, noFrameSkip=False):
      if session_file != "":
         self.load_optimizer(session_file)
      else:
         self.max_steps = max_steps
         self.action_encoding = action_encoding
         self.render = render
         self.fitness_strategy = fitness_strategy  #score, x_pos, time, coins
         self.num_chromosomes = num_chromosomes
         self.world = world
         self.stage = stage
         self.version = version
         self.noFrameSkip = 'NoFrameskip' if noFrameSkip else ''
         self.init_chromosomes()

   def init_chromosomes(self):
      """Creates a new set of genes based on the number of parents fed in"""
      self.chromosomes = []
      for i in range(self.num_chromosomes):
         chromosome = [np.random.randint(0,len(self.action_encoding), self.max_steps), -1, -1]
         self.chromosomes.append(chromosome)
      self.evaluate_chromosomes()

   def save_optimizer(self, fname):
      print("saving optimizer state to ",fname)
      optimizer_state = Optimizer(self.max_steps, self.num_chromosomes, self.action_encoding, self.render, self.fitness_strategy, self.chromosomes, self.world, self.stage, self.version, self.noFrameSkip)
      with open(fname, "wb") as f:
         pickle.dump(optimizer_state, f)

   def load_optimizer(self, fname):
      print("loading optimier state from ",fname)
      with open(fname, "rb") as f:
         optimizer = pickle.load(f)
         self.max_steps = optimizer.max_steps
         self.action_encoding = optimizer.action_encoding
         self.render = optimizer.render
         self.fitness_strategy = optimizer.fitness_strategy
         self.num_chromosomes = optimizer.num_chromosomes
         self.chromosomes = optimizer.chromosomes
         self.world = optimizer.world
         self.stage = optimizer.stage
         self.version = optimizer.version
         self.noFrameSkip = optimizer.noFrameSkip


   def run_generations(self, ngens, fname):

      headers = ['generation', 'chromosome_num', self.fitness_strategy, 'avg_fitness']

      #If logging for the first time, set up csv file
      if fname:
         print("logging progress to " + fname)

         with open (fname, 'w') as csvfile:
            writer = csv.DictWriter(csvfile, delimiter=',', lineterminator='\n',fieldnames=headers)
            writer.writeheader()

      for gen in range(ngens):
         self.new_generation()
         self.evaluate_chromosomes()
         max_fitness, max_fitness_ix = self.get_max_fitness_chromosome()
         avg_fitness = self.get_avg_chromosome_fitness()

         #If writing progress to output, add this generation
         if fname:
            with open (fname, 'a') as csvfile:
               writer = csv.DictWriter(csvfile, delimiter=',', lineterminator='\n',fieldnames=headers)
               writer.writerow({'generation': gen, 'chromosome_num': max_fitness_ix, self.fitness_strategy: max_fitness, 'avg_fitness': avg_fitness})

         print("\n#################################")
         print("GENERATION",gen,"COMPLETE")
         print("Highest chromosome: ",max_fitness_ix,", fitness:",max_fitness)
         print("Average fitness: ", avg_fitness)
         print("####################################\n\n\n")


   def get_max_fitness_chromosome(self):
      """returns highest fitness of current chromosomes, along with its index"""
      max_fitness = -1
      max_fitness_ix = -1
      max_chromosome = []
      for cix, chromosome in enumerate(self.chromosomes):
         if chromosome[1] > max_fitness:
            max_fitness = chromosome[1]
            max_fitness_ix = cix

      return max_fitness, max_fitness_ix

   def get_avg_chromosome_fitness(self):
      total_fitness = 0
      for chromosome in self.chromosomes:
         total_fitness += chromosome[1]

      return int(total_fitness / len(self.chromosomes))

   @abstractmethod
   def new_generation(self):
      """
      Based on a chromosomes structure, updates the chromosomes by natural selection rules
      This is where the bulk of the evolutionary computation code will go
      Update here
      """
      #
      pass
      # For now now selection occurs, just keep current chromosomes


   def run_top_chromosome(self, render=False):
      """
      Retrieve the best-performing chromosome and play it.
      Override render argument in case only want to visualize on test round
      """
      max_fitness, max_fitness_ix = self.get_max_fitness_chromosome()

      if max_fitness == -1:
         print('Run top chromosome error: no fitnesses have been computed')

      with mariocontext(self) as env:
         done = True
         for step, action in enumerate(self.chromosomes[max_fitness_ix][0]):

            state, reward, done, info = env.step(action)
            if done or info['flag_get']:
               return

            if render: env.render()


   def evaluate_chromosome(self, input_tuple):
      """Evaluates a chromosome for it's fitness value and index of death"""

      chromosome_num, chromosome = input_tuple

      if chromosome[1] != -1:
         return chromosome

      with mariocontext(self) as env:

         best_fitness_step = 0

         state = env.reset()
         #Main evaluation loop for this chromosome
         for step, action in enumerate(chromosome[0]):

            #take step
            state, reward, done, info = env.step(action)

            if (info[self.fitness_strategy] > best_fitness_step):
               best_fitness_step = step

            #died or level beat
            if (done or info['flag_get']):
               break

            #print progress
            if step % 50 == 0:
               print("chromosome:",chromosome_num," step:", step," action:",action, "info:",info)

            #display on screen
            if self.render:
               env.render()

         chromosome[1], chromosome[2] = info[self.fitness_strategy], best_fitness_step

         print("chromosome",chromosome_num," done fitness ",self.fitness_strategy ,"= ",info[self.fitness_strategy])
         return chromosome


   def evaluate_chromosomes(self):
      """
      Given a gene structure, evaluates all genes for their fitness and stores it in the stucture
      This is what actually runs the training simulation
      Input: a gene structure with (possibly) empty fitnesses
      Output: a gene structure with computed fitnesses and index of death
      """

      bound_instance_method_alias = functools.partial(_instance_method_alias, self)
      with poolcontext(1) as pool:
         self.chromosomes = pool.map(bound_instance_method_alias, enumerate(self.chromosomes))


class Optimizer():
   """A basic container class for saving optimizer contents"""

   def __init__(self, max_steps, num_chromosomes, action_encoding, render, fitness_strategy, chromosomes, world, stage, version, noFrameSkip):
      self.max_steps = max_steps
      self.num_chromosomes = num_chromosomes
      self.action_encoding = action_encoding
      self.render = render
      self.fitness_strategy = fitness_strategy
      self.chromosomes = chromosomes
      self.world = world
      self.stage = stage
      self.version = version
      self.noFrameSkip = noFrameSkip


@contextmanager
def poolcontext(*args, **kwargs):
   pool = multiprocessing.Pool(*args, **kwargs)
   yield pool
   pool.terminate()

@contextmanager
def mariocontext(marioEnv):
   mario_env = 'SuperMarioBros' + marioEnv.noFrameSkip + '-' + str(marioEnv.world) + '-' + str(marioEnv.stage) + '-v' + str(marioEnv.version)
   env = gym_super_mario_bros.make(mario_env)
   env = BinarySpaceToDiscreteSpaceEnv(env, marioEnv.action_encoding)
   yield env
   env.close()


def _instance_method_alias(obj, arg):
   """
   Alias for instance method that allows the method to be called in a
   multiprocessing pool
   """
   return obj.evaluate_chromosome(arg)

This environment can be easily extended to customize crossover, parent selection, and mutation methods. Here is an example of an EvolutionEnv extending the MSBGeneticOptimizerEnv:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from msb_genetic_optimizer_env import MSBGeneticOptimizerEnv
import numpy as np


##The below class inherits everything from MSBGeneticOptimizerEnv, all you will need to do is modi
class EvolutionEnv(MSBGeneticOptimizerEnv):
   # Arguments: max_steps=10000, num_chromosomes=4, action_encoding=SIMPLE_MOVEMENT, render=False, reward="score", session_file=""
   def __init__(self, *args, **kwargs):
      super(EvolutionEnv, self).__init__(*args, **kwargs)

   def new_generation(self):
      """
      Based on a chromosomes structure, updates the chromosomes by natural selection rules
      This is where the bulk of the evolutionary computation code will go
      We will need to modify the chromosome structure in some
      """
        
      # elite selection for parent
      parents = self.select_parents(3, int(self.num_chromosomes/2))
      offspring = []

      # Crossover
      for i in range(int(np.floor(len(parents) / 2))):
         offspring.extend(self.crossover_chromosome_pair(parents[2 * i], parents[2 * i + 1],
                              point_num=2, points_before_death=True, normal_dist=True))

      # Mutation
      # using lambda = mu (numberof parents equal to the number of offsprings)
      # For each selected parent, will mutate around 20% of the max_steps actions
      mutations = round(0.2 * self.max_steps)
      for chromosome in parents:
         child = [chromosome[0].copy(), -1, chromosome[2]]
         self.mutate_chromosome(child, mutations)
         offspring.append(child)

      # (mu, lambda) Using only offspring to populate new generation here
      # self.chromosomes = offspring

      # (mu + lambda) Using parents + offspring to populate new generation here
      self.chromosomes = parents + offspring

   ###
   #  Basic implementation of parent selection
   #  - selection_type: 0: shuffle, 1: elite
   #  - mu: number of parents to select
   ###
   def select_parents(self, selection_type=1, mu=1):
      def shuffle_selection():
         np.random.shuffle(self.chromosomes)
         return self.chromosomes[:mu]

      def elite_selection(mu):
         parents = []
         # select mu best chromosomes
         best_chromosomes_index = sorted(range(len(self.chromosomes)), key=lambda x: self.chromosomes[x][1],
                                         reverse=True)
         # delete from chromosome list if it is not within the mu best
         for i in range(mu):
            parents.append(self.chromosomes[best_chromosomes_index[i]])
         return parents

      def linear_ranking_selection(mu):
         parents = []
         best_chromosomes_index = sorted(range(len(self.chromosomes)), key=lambda x: self.chromosomes[x][1], reverse=True)
         s = []
         s.append(0)
         for i in range(self.num_chromosomes):
            s.append(s[i]+((1.0/self.num_chromosomes)*(1.5-((i)/(self.num_chromosomes-1.0)))))
         for i in range(mu):
            r=s[self.num_chromosomes]*np.random.random_sample()
            for i in range(self.num_chromosomes):
               if s[i] <= r < s[i+1]:
                  parents.append(self.chromosomes[best_chromosomes_index[self.num_chromosomes-(i+1)]])
         return parents

      def roulette_wheel_selection(mu):
         parents = []
         total=0
         for i in range(self.num_chromosomes):
            total=total+self.chromosomes[i][1];
         s = []
         s.append(0)
         for i in range(self.num_chromosomes):
            s.append(s[i]+((self.chromosomes[i][1])/(float)(total)))
         for i in range(mu):
            r=np.random.random_sample()
            for i in range(self.num_chromosomes):
               if s[i] <= r < s[i+1]:
                  parents.append(self.chromosomes[i])
         return parents

      if selection_type == 0:
         return shuffle_selection()
      elif selection_type == 1:
         return elite_selection(mu)
      elif selection_type == 2:
         return linear_ranking_selection(mu)
      elif selection_type == 3:
         return roulette_wheel_selection(mu)

   def crossover_chromosome_pair(self, parent1, parent2, point_num=1, points_before_death=False, normal_dist=False):
      ###
      #	Implementation of crossover functionality
      #	- points: number of points to use in crossover
      #	- points_before_death: only consider points upto sooner death
      #	- normal_dist: sample crossover points from normal distribution
      ###
      def positive_normal():
         x = abs(np.random.randn() / 3)
         return max(1 - x, 0)

      point_range_max = min(len(parent1[0]), len(parent2[0])) if not points_before_death \
         else max(parent1[2], parent2[2])
      random = np.random.rand if not normal_dist else positive_normal

      crossover_points = [point_range_max]
      for i in range(point_num):
         crossover_points.append(int(round(point_range_max * random())))
      crossover_points.sort(reverse=True)

      child1, child2 = [parent1[0].copy(), -1, -1], [parent2[0].copy(), -1, -1]
      while len(crossover_points) > 1:
         point1 = crossover_points.pop()
         point2 = crossover_points.pop()
         child1[0][point1:point2] = parent2[0][point1:point2].copy()
         child2[0][point1:point2] = parent1[0][point1:point2].copy()

      return [child1, child2]

   def mutate_chromosome(self, chromosome, mutations=1):
   ###
   # Implementation of mutation operator
   # Mutations: number of mutations to make
   ###
      for i in range(int(mutations)):
         #mutation index: triangular distribution with: bound left=0, bound right and mode=index of game over
         mutation_index = int(np.ceil(np.random.triangular(left=0, mode=1, right=1) * chromosome[2]))
         #new_action: random within the allowed possible actions
         actions_size = len(self.action_encoding)
         #new_action = np.random.randint(low=0, high=actions_size)
         #new: prioritize actions between 1 to 4 (going right)
         new_action = np.random.randint(low=0, high=(actions_size+4))
         if (new_action >= actions_size):
            new_action = new_action - actions_size + 1
         chromosome[0][mutation_index] = new_action
      chromosome[1], chromosome[2] = -1, -1

After this high-level wrapper is complete, it can easily be used in a short script. Here is an example experiment:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#create a new optimizer environment and initialize data structure
optimizer = SelectionEnv(max_steps = 500, num_chromosomes=6, render=True)

#run the optimizer for the desired number of generations
optimizer.run_generations(3)

#serialize the data structure to a file. Pickle for effeciency 
optimizer.save_optimizer('mario-4-chromosome.p')

#see how the top performing chromosome looks in the simulator 
optimizer.run_top_chromosome(render=True)

#To load a previous environment, either create a new optimizer with the filename,or call the load optimizer function directly. Note this will overwrite the current state
optimizer2 = SelectionEnv(session_file ="mario-4-chromosome.p")
load the previous session and keep training
optimizer2.run_generations(1)
optimizer2.save_optimizer('mario-4-chromosome2.p') #save to an updated file 

My teammates then extended this environment to examine different evolutionary computation simulations. See our poster here for more information about these aspects of the project!