sweagent.environment.swe_env.SWEEnv

Source code in SWE-agent/sweagent/environment/swe_env.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
class SWEEnv:
    def __init__(
        self,
        *,
        deployment: AbstractDeployment,
        repo: Repo | RepoConfig | None,
        post_startup_commands: list[str],
        post_startup_command_timeout: int = 500,
        hooks: list[EnvHook] | None = None,
        name: str = "main",
    ):
        """This class represents the environment in which we solve the tasks.

        Args:
            deployment: SWE-ReX deployment instance
            repo: Repository configuration object, or anything following the `Repo` protocol
            post_startup_commands: Commands to execute before starting the agent
            hooks: Environment hooks (used to inject custom functionality)
                Equivalent to calling `add_hook` for each hook after initialization.
            name: Name of the environment
        """
        super().__init__()
        self.deployment = deployment
        self.repo = repo
        self._post_startup_commands = post_startup_commands
        self.post_startup_command_timeout = post_startup_command_timeout
        self.logger = get_logger("swea-env", emoji="ðŸŠī")
        self.name = name
        self.clean_multi_line_functions = lambda x: x
        self._chook = CombinedEnvHooks()
        for hook in hooks or []:
            self.add_hook(hook)

    @classmethod
    def from_config(cls, config: EnvironmentConfig,ds) -> Self:
        """Create an environment instance from a configuration object.
        This is the recommended way to create an environment instance, unless you need
        more flexibility.
        :meta private:
        """
        # Always copy config to avoid shared state between different instances
        config = config.model_copy(deep=True)
        return cls(
            deployment=get_deployment(config.deployment,ds),
            repo=config.repo,
            post_startup_commands=config.post_startup_commands,
            post_startup_command_timeout=config.post_startup_command_timeout,
            name=config.name,
        )

    def add_hook(self, hook: EnvHook) -> None:
        """Add `EnvHook` to the environment.
        This allows to inject custom functionality at different stages of the environment
        lifecycle, in particular to connect SWE-agent to a new interface (like a GUI).

        :meta private:
        """
        hook.on_init(env=self)
        self._chook.add_hook(hook)
    def apply_patch(self,patch):
        """Apply patch to the git repository in the environment"""
        GIT_APPLY_CMDS = [
        "git apply --verbose",
        "git apply --verbose --reject",
        "patch --batch --fuzz=5 -p1 -i",
        ]
        #asyncio.run(self.runtime.run_in_session(BashAction(command=reset_command, timeout=120, check='raise')))
        #try three methods to apply the patch
        for cmd in GIT_APPLY_CMDS:
            try:
                apply_command = (
                    f'cd "/{self.git_folder}" && '
                    f'echo "{patch}" | {cmd} - 2>/dev/null'
                )
                asyncio.run(self.deployment.runtime.run_in_session(BashAction(command=apply_command, timeout=120, check='raise')))
                print("test patch applied successfully")
                return
            except Exception as e:
                self.logger.warning(f"Failed to apply patch using {cmd}: {e}")
        raise RuntimeError("Failed to apply patch using all methods.")
    def apply_test_patch(self):
        """Apply test patch if provided in deployment ds"""

        test_patch=self.deployment.ds.get('test_patch',None)
        if test_patch is not None:
            self.apply_patch(test_patch)
    def _calculate_reward(self):
        """Calculate reward for the environment"""
        try:
            return self.deployment._calculate_reward()
        except:
            return 0.0,{},{},''
    def start(self) -> None:
        """Start the environment and reset it to a clean state.
        :meta private:
        """
        self._init_deployment()
        self.reset()
        for command in self._post_startup_commands:
            self.communicate(command, check="raise", timeout=self.post_startup_command_timeout)

    def _copy_repo(self) -> None:
        """Clone/copy repository/codebase in container
        :meta private:
        """
        if self.repo is None:
            return

        folders = self.communicate(input="ls", check="raise").split("\n")
        if self.repo.repo_name in folders:
            return

        self._chook.on_copy_repo_started(repo=self.repo)
        self.repo.copy_repo(self.deployment)

    def hard_reset(self):
        """Resets the environment and deployment, i.e., completely restarts the
        deployment.

        :meta private:
        """
        self.close()
        self.start()

    def reset(self):
        """Reset the environment to a clean state.
        Gets called by `start`, but can also be called independently to reset the
        environment to a clean state before a new attempt.

        Returns:
            observation: output from container
            info: additional information (e.g. debugging information)
        """
        self.communicate(input="cd /", check="raise")
        self._copy_repo()
        self._reset_repository()
        self._chook.on_environment_startup()

    def _reset_repository(self) -> None:
        """Clean repository of any modifications + Checkout base commit"""
        if self.repo is not None:
            self.logger.debug("Resetting repository %s to commit %s", self.repo.repo_name, self.repo.base_commit)
            # todo: Currently has swe-ft specific change: The original repo.copy isn't called, because the repo is already
            # present. However, reset --hard <BRANCH> also doesn't work. So modified it here to do a checkout instead.
            startup_commands = [
                f"cd /{self.repo.repo_name}",
                "export ROOT=$(pwd -P)",
                *self.repo.get_reset_commands(),
            ]
            self.communicate(
                input=" && ".join(startup_commands),
                check="raise",
                error_msg="Failed to clean repository",
                # Sometimes this is slow because it rebuilds some index
                timeout=120,
            )

    def close(self) -> None:
        """Shutdown SWE-ReX deployment etc."""
        self.logger.info("Beginning environment shutdown...")
        asyncio.run(self.deployment.stop())
        self._chook.on_close()

    # MARK: Helper functions #

    def _init_deployment(
        self,
    ) -> None:
        """Handles container initialization. Defines container name and creates it.
        If cached_image is provided, it will use that image name instead of the default.
        """
        self._chook.on_start_deployment()
        asyncio.run(self.deployment.start())
        asyncio.run(
            self.deployment.runtime.create_session(
                CreateBashSessionRequest(startup_source=["/root/.bashrc"], startup_timeout=10)
            )
        )
        self.set_env_variables({"LANG": "C.UTF-8", "LC_ALL": "C.UTF-8", "PIP_PROGRESS_BAR": "off", "PAGER": "cat"})
        self.logger.info("Environment Initialized")

    def interrupt_session(self):
        self.logger.info("Interrupting session")
        asyncio.run(self.deployment.runtime.run_in_session(BashInterruptAction()))

    # todo: return exit code?
    def communicate(
        self,
        input: str,
        timeout: int | float = 25,
        *,
        check: Literal["warn", "ignore", "raise"] = "ignore",
        error_msg: str = "Command failed",
    ) -> str:
        """Executes a command in the running shell. The details of this are handled by
        the SWE-ReX deployment/runtime.

        Args:
            input: input to send to container
            timeout_duration: duration to wait for output
            check: `ignore`: do not extract exit code (more stable), `warn`: extract exit code and log error if
                exit code is non-zero, `raise`: raise error if exit code is non-zero
            error_msg: error message to raise if the command fails

        Returns:
            output: output from container
        """
        self.logger.log(logging.TRACE, "Input:\n%s", input)  # type: ignore
        rex_check = "silent" if check else "ignore"
        r = asyncio.run(
            self.deployment.runtime.run_in_session(BashAction(command=input, timeout=timeout, check=rex_check))
        )
        output = r.output
        self.logger.log(logging.TRACE, "Output:\n%s", output)  # type: ignore
        if check != "ignore" and r.exit_code != 0:
            self.logger.error(f"{error_msg}:\n{output}")
            msg = f"Command {input!r} failed ({r.exit_code=}): {error_msg}"
            self.logger.error(msg)
            if check == "raise":
                self.close()
                raise RuntimeError(msg)
        return output

    def read_file(self, path: str | PurePath, encoding: str | None = None, errors: str | None = None) -> str:
        """Read file contents from container

        Args:
            path: Absolute path to file
            encoding: Encoding to use when reading the file. None means default encoding.
                This is the same as the `encoding` argument of `Path.read_text()`
            errors: Error handling to use when reading the file. None means default error handling.
                This is the same as the `errors` argument of `Path.read_text()`

        Returns:
            file_contents: Contents of file as string
        """
        r = asyncio.run(
            self.deployment.runtime.read_file(ReadFileRequest(path=str(path), encoding=encoding, errors=errors))
        )
        return r.content

    def write_file(self, path: str | PurePath, content: str) -> None:
        """Write content to file in container"""
        asyncio.run(self.deployment.runtime.write_file(WriteFileRequest(path=str(path), content=content)))

    def set_env_variables(self, env_variables: dict[str, str]) -> None:
        """Set environment variables in the environment."""
        if not env_variables:
            self.logger.debug("No environment variables to set")
            return
        _env_setters = [f"export {k}={shlex.quote(str(v))}" for k, v in env_variables.items()]
        command = " && ".join(_env_setters)
        self.communicate(command, check="raise")

    def execute_command(
        self,
        command: str,
        shell: bool = True,
        check: bool = False,
        env: dict[str, str] | None = None,
        cwd: str | None = None,
    ) -> None:
        """Execute a command in the environment independent of the session (i.e., as a subprocess)"""
        asyncio.run(
            self.deployment.runtime.execute(RexCommand(command=command, shell=shell, check=check, env=env, cwd=cwd))
        )

apply_patch

apply_patch(patch)

Apply patch to the git repository in the environment

Source code in SWE-agent/sweagent/environment/swe_env.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def apply_patch(self,patch):
    """Apply patch to the git repository in the environment"""
    GIT_APPLY_CMDS = [
    "git apply --verbose",
    "git apply --verbose --reject",
    "patch --batch --fuzz=5 -p1 -i",
    ]
    #asyncio.run(self.runtime.run_in_session(BashAction(command=reset_command, timeout=120, check='raise')))
    #try three methods to apply the patch
    for cmd in GIT_APPLY_CMDS:
        try:
            apply_command = (
                f'cd "/{self.git_folder}" && '
                f'echo "{patch}" | {cmd} - 2>/dev/null'
            )
            asyncio.run(self.deployment.runtime.run_in_session(BashAction(command=apply_command, timeout=120, check='raise')))
            print("test patch applied successfully")
            return
        except Exception as e:
            self.logger.warning(f"Failed to apply patch using {cmd}: {e}")
    raise RuntimeError("Failed to apply patch using all methods.")

apply_test_patch

apply_test_patch()

Apply test patch if provided in deployment ds

Source code in SWE-agent/sweagent/environment/swe_env.py
131
132
133
134
135
136
def apply_test_patch(self):
    """Apply test patch if provided in deployment ds"""

    test_patch=self.deployment.ds.get('test_patch',None)
    if test_patch is not None:
        self.apply_patch(test_patch)

_calculate_reward

_calculate_reward()

Calculate reward for the environment

Source code in SWE-agent/sweagent/environment/swe_env.py
137
138
139
140
141
142
def _calculate_reward(self):
    """Calculate reward for the environment"""
    try:
        return self.deployment._calculate_reward()
    except:
        return 0.0,{},{},''

sweagent.environment.swe_sbenv.SWEsbEnv

Source code in SWE-agent/sweagent/environment/swe_sbenv.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
class SWEsbEnv:
    def __init__(
        self,
        *,
        deployment: AbstractDeployment,
        repo: Repo | RepoConfig | None,
        post_startup_commands: list[str],
        post_startup_command_timeout: int = 500,
        hooks: list[EnvHook] | None = None,
        name: str = "main",
    ):
        """This class represents the environment in which we solve the tasks.

        Attributes:
            deployment: MiniSandbox deployment 
            repo: Repository configuration object, or anything following the `Repo` protocol
            post_startup_commands: Commands to execute before starting the agent
            hooks: Environment hooks (used to inject custom functionality)
                Equivalent to calling `add_hook` for each hook after initialization.
            name: Name of the environment
        """
        super().__init__()
        self.deployment = deployment
        self.repo = repo
        self._post_startup_commands = post_startup_commands
        self.post_startup_command_timeout = post_startup_command_timeout
        self.logger = get_logger("swea-env", emoji="ðŸŠī")
        self.name = name
        self.clean_multi_line_functions = lambda x: x
        self._chook = CombinedEnvHooks()
        for hook in hooks or []:
            self.add_hook(hook)

    @classmethod
    def from_config(cls, ds,bundles,config: EnvironmentConfig) -> Self:
        """Create an environment instance from a configuration object.
        We change the creation of deployment here to support our custom deployment creation.

        Attributes:
            ds: a data item from dataset
            bundles: list of tool bundles to be installed in the environment
        """
        # Always copy config to avoid shared state between different instances
        config = config.model_copy(deep=True)
        #check class type of deployment

        return cls(
            deployment=get_deployment_from_config(config=config.deployment,ds=ds,bundles=bundles),
            repo=config.repo,
            post_startup_commands=config.post_startup_commands,
            post_startup_command_timeout=config.post_startup_command_timeout,
            name=config.name,
        )

    def add_hook(self, hook: EnvHook) -> None:

        hook.on_init(env=self)
        self._chook.add_hook(hook)

    def start(self) -> None:
        """Start the environment and reset it to a clean state.
            Step 1 : initialize deployment (session creation)
            Step 2 : reset environment (repo copy/reset)
            Step 3 : post init (venv preparation, tool installation, etc.)
        """
        Time_data={}
        time_data=self._init_deployment()
        Time_data['init_deployment']=time_data
        reset_time = time.time()
        self.reset()
        reset_end_time = time.time()
        reset_data={"reset_duration": reset_end_time - reset_time,"reset_start_time":reset_time,"reset_end_time":reset_end_time}
        Time_data['reset_deployment']=reset_data
        self._chook.on_post_init()
        post_init_time_data=self.deployment.post_init(self)

        Time_data.update(post_init_time_data)
        return Time_data


        # for command in self._post_startup_commands:
        #     self.communicate(command, check="raise", timeout=self.post_startup_command_timeout)
    def _copy_repo(self):
        """Clone/copy repository/codebase in mini-sandbox environment."""
        if self.repo is None:
            return True

        self._chook.on_copy_repo_started(repo=self.repo)
        return self.repo.copy2(deployment=self.deployment,try_count=3,local_path=self.deployment.cached_git_dir)

    def hard_reset(self):
        """Resets the environment and deployment, i.e., completely restarts the
        deployment.
        """
        self.close()
        self.start()

    def reset(self):
        """Reset the environment to a clean state.
        Gets called by `start`, but can also be called independently to reset the
        environment to a clean state before a new attempt.

        """
       #self.communicate(input="cd /", check="raise")

        if not self._copy_repo():
            self._reset_repository()

        self._chook.on_environment_startup()
    def apply_test_patch(self):
        """apply test patch only"""

        test_patch=self.deployment.ds.get('test_patch',None)
        if test_patch is not None:
            apply_patch(sandbox_root=self.deployment.root_dir,git_folder=self.deployment.git_folder,patch_str=test_patch,instance_id=self.deployment.root_dir,reverse=False)
    def pre_check(self):
        """This funtion is used to apply a golden patch if provided and run eval scripts right after repo is copied and 
        reset and installed. (mainly used to check if the env is valid before running
        any agent steps.)
        Note that for swesmith data_type, the patch is applied reversely
        for swebench data_type, the patch is applied normally
        """
        # if self.deployment.eval_dir!="":
        #     path= f"{self.deployment.eval_dir}/preds.json"
        #     # load json
        #     import json
        #     with open(path, 'r') as f:
        #         preds = json.load(f)
        #     current_pred = preds.get(self.deployment.ds['instance_id'],None)
        #     if current_pred is not None:
        #         patch = current_pred.get('patch',None)
        #         if patch=='':
        #             patch = None
        #     else:
        #         patch = None
        # else:
        patch=self.deployment.ds.get('patch',None)
        test_patch=self.deployment.ds.get('test_patch',None)

        if self.deployment._config.data_type=='swesmith':
            if patch is not None:
                apply_patch(sandbox_root=self.deployment.root_dir,git_folder=self.deployment.git_folder,patch_str=patch,instance_id=self.deployment.root_dir,reverse=True)
            if test_patch is not None:
                apply_patch(sandbox_root=self.deployment.root_dir,git_folder=self.deployment.git_folder,patch_str=test_patch,instance_id=self.deployment.root_dir,reverse=False)
            self.logger.info("Applied golden patch to repository")
        elif self.deployment._config.data_type=='swebench':
            if patch is not None and patch !='':
                apply_patch(sandbox_root=self.deployment.root_dir,git_folder=self.deployment.git_folder,patch_str=patch,instance_id=self.deployment.root_dir,reverse=False)

            if test_patch is not None:
                apply_patch(sandbox_root=self.deployment.root_dir,git_folder=self.deployment.git_folder,patch_str=test_patch,instance_id=self.deployment.root_dir,reverse=False)
            self.logger.info("Applied golden patch to repository")
        reward,f2p_dic,p2p_dic,output=self._calculate_reward(p2p=0,f2p=0)
        print('reward',reward)
        self._reset_repository()
        self.deployment.reset()
        return reward,f2p_dic,p2p_dic,output
        # if reward==0:
        #     print("Pre-check failed: f2p_dic:",f2p_dic,"p2p_dic:",p2p_dic)
        #     raise RuntimeError("Pre-check failed")
        # else:
        #     print("Pre-check passed")

        # reset the repository again to remove any changes made by the eval scripts


    def _reset_repository(self) -> None:
        """Clean repository of any modifications + Checkout base commit"""
        if self.repo is not None:
            self.logger.debug("Resetting repository %s to commit %s", self.repo.repo_name, self.repo.base_commit)
            # todo: Currently has swe-ft specific change: The original repo.copy isn't called, because the repo is already
            # present. However, reset --hard <BRANCH> also doesn't work. So modified it here to do a checkout instead.
            startup_commands = [
                f"cd /{self.repo.git_folder}",
                "export ROOT=$(pwd -P)",
                *self.repo.get_reset_commands(),
            ]
            try_count=6
            count=0
            while count<try_count:
                try:
                    self.communicate(
                        input=" && ".join(startup_commands),
                        check="raise",
                        error_msg="Failed to clean repository",
                        # Sometimes this is slow because it rebuilds some index
                        timeout=120,
                    )
                    break
                except Exception as e:
                    count+=1
                    if count==try_count:
                        raise e


        # if self.deployment._config.data_type=='swesmith':
        #     patch=self.deployment.ds['patch']
        #     apply_patch(sandbox_root=self.deployment.root_dir,git_folder=self.deployment.git_folder,patch_str=patch,instance_id=self.deployment.root_dir,reverse=False)
        #     self.logger.info("Applied patch to repository")
        clean_diff_commands = [
        f"cd /{self.deployment.git_folder}",
        "git config user.email setup@swebench.config",
        "git config user.name SWE-bench",
        "git commit --allow-empty -am SWE-bench",
        ]
        self.communicate(
            input=" && ".join(clean_diff_commands),
            check="raise",
            error_msg="Failed to clean repository",
            # Sometimes this is slow because it rebuilds some index
            timeout=120,
        )
    def close(self) -> None:

        self.logger.info("Beginning environment shutdown...")
        asyncio.run(self.deployment.stop())
        self._chook.on_close()

    # MARK: Helper functions #

    def _init_deployment(
        self,
    ) -> None:
        """
        Handles initialization. Creates the runtime and starts a session.
        """
        session_create_time = time.time()
        #self._chook.on_start_deployment()
        asyncio.run(self.deployment.start())
        asyncio.run(
            self.deployment.runtime.create_session(
                CreateSandboxBashSessionRequest(startup_source=["/root/.bashrc"], startup_timeout=60,startup_cmd=self.deployment.startup())
            )
        )
        session_end_time = time.time()
        time_data={"session_duration": session_end_time - session_create_time,"session_start_time":session_create_time,"session_end_time":session_end_time}

        env_setup_time = time.time()
        self.set_env_variables({"LANG": "C.UTF-8", "LC_ALL": "C.UTF-8", "PIP_PROGRESS_BAR": "off", "PAGER": "cat"})
        env_setup_end_time = time.time()
        time_data["env_setup_duration"]=env_setup_end_time - env_setup_time
        self.logger.info("Environment Initialized")
        return time_data
    def interrupt_session(self):
        self.logger.info("Interrupting session")
        asyncio.run(self.deployment.runtime.run_in_session(BashInterruptAction()))

    # todo: return exit code?
    def communicate(
        self,
        input: str,
        timeout: int | float = 25,
        *,
        check: Literal["warn", "ignore", "raise"] = "ignore",
        error_msg: str = "Command failed",
    ) -> str:
        """Executes a command in the running shell. The details of this are handled by
        the SWE-ReX deployment/runtime.

        Args:
            input: input to send to container
            timeout_duration: duration to wait for output
            check: `ignore`: do not extract exit code (more stable), `warn`: extract exit code and log error if
                exit code is non-zero, `raise`: raise error if exit code is non-zero
            error_msg: error message to raise if the command fails

        Returns:
            output: output from gym environment
        """
        self.logger.log(logging.TRACE, "Input:\n%s", input)  # type: ignore
        rex_check = "silent" if check else "ignore"
        r = asyncio.run(
            self.deployment.runtime.run_in_session(BashAction(command=input, timeout=timeout, check=rex_check))
        )
        output ='\n'.join(r.output.split('\n')[:-1])

        self.logger.log(logging.TRACE, "Output:\n%s", output)  # type: ignore
        if check != "ignore" and r.exit_code != 0:
            self.logger.error(f"{error_msg}:\n{output}")
            msg = f"Command {input!r} failed ({r.exit_code=}): {error_msg}"
            self.logger.error(msg)
            if check == "raise":
                self.close()
                raise RuntimeError(msg)
        return output
    def get_patch(self,dir="/res.patch"):
        """Get patch from mini-sandbox environment"""
        self.deployment.get_patch(dir)

    def read_file(self, path: str | PurePath, encoding: str | None = None, errors: str | None = None) -> str:

        root_dir=self.deployment._config.root_dir
        tg_path=root_dir+str(path)
        r = asyncio.run(
            self.deployment.runtime.read_file(ReadFileRequest(path=str(tg_path), encoding=encoding, errors=errors))
        )
        return r.content

    def write_file(self, path: str | PurePath, content: str) -> None:

        asyncio.run(self.deployment.runtime.write_file(WriteFileRequest(path=str(path), content=content)))

    def set_env_variables(self, env_variables: dict[str, str]) -> None:
        """Set environment variables in the environment."""
        if not env_variables:
            self.logger.debug("No environment variables to set")
            return
        _env_setters = [f"export {k}={shlex.quote(str(v))}" for k, v in env_variables.items()]
        command = " && ".join(_env_setters)
        self.communicate(command, check="raise")
    def _calculate_reward(self,p2p=1,f2p=1):
        """Calculate reward for the environment"""
        try:
            return self.deployment._calculate_reward(p2p=p2p,f2p=f2p)
        except:
            return 0.0,{},{},''
    def execute_command(
        self,
        command: str,
        shell: bool = True,
        check: bool = False,
        env: dict[str, str] | None = None,
        cwd: str | None = None,
    ) -> None:

        asyncio.run(
            self.deployment.runtime.execute(RexCommand(command=command, shell=shell, check=check, env=env, cwd=cwd))
        )

__init__

__init__(*, deployment, repo, post_startup_commands, post_startup_command_timeout=500, hooks=None, name='main')

This class represents the environment in which we solve the tasks.

Attributes:
  • deployment –

    MiniSandbox deployment

  • repo –

    Repository configuration object, or anything following the Repo protocol

  • post_startup_commands –

    Commands to execute before starting the agent

  • hooks –

    Environment hooks (used to inject custom functionality) Equivalent to calling add_hook for each hook after initialization.

  • name –

    Name of the environment

Source code in SWE-agent/sweagent/environment/swe_sbenv.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(
    self,
    *,
    deployment: AbstractDeployment,
    repo: Repo | RepoConfig | None,
    post_startup_commands: list[str],
    post_startup_command_timeout: int = 500,
    hooks: list[EnvHook] | None = None,
    name: str = "main",
):
    """This class represents the environment in which we solve the tasks.

    Attributes:
        deployment: MiniSandbox deployment 
        repo: Repository configuration object, or anything following the `Repo` protocol
        post_startup_commands: Commands to execute before starting the agent
        hooks: Environment hooks (used to inject custom functionality)
            Equivalent to calling `add_hook` for each hook after initialization.
        name: Name of the environment
    """
    super().__init__()
    self.deployment = deployment
    self.repo = repo
    self._post_startup_commands = post_startup_commands
    self.post_startup_command_timeout = post_startup_command_timeout
    self.logger = get_logger("swea-env", emoji="ðŸŠī")
    self.name = name
    self.clean_multi_line_functions = lambda x: x
    self._chook = CombinedEnvHooks()
    for hook in hooks or []:
        self.add_hook(hook)

apply_test_patch

apply_test_patch()

apply test patch only

Source code in SWE-agent/sweagent/environment/swe_sbenv.py
164
165
166
167
168
169
def apply_test_patch(self):
    """apply test patch only"""

    test_patch=self.deployment.ds.get('test_patch',None)
    if test_patch is not None:
        apply_patch(sandbox_root=self.deployment.root_dir,git_folder=self.deployment.git_folder,patch_str=test_patch,instance_id=self.deployment.root_dir,reverse=False)

communicate

communicate(input, timeout=25, *, check='ignore', error_msg='Command failed')

Executes a command in the running shell. The details of this are handled by the SWE-ReX deployment/runtime.

Parameters:
  • input (str) –

    input to send to container

  • timeout_duration –

    duration to wait for output

  • check (Literal['warn', 'ignore', 'raise'], default: 'ignore' ) –

    ignore: do not extract exit code (more stable), warn: extract exit code and log error if exit code is non-zero, raise: raise error if exit code is non-zero

  • error_msg (str, default: 'Command failed' ) –

    error message to raise if the command fails

Returns:
  • output( str ) –

    output from gym environment

Source code in SWE-agent/sweagent/environment/swe_sbenv.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
def communicate(
    self,
    input: str,
    timeout: int | float = 25,
    *,
    check: Literal["warn", "ignore", "raise"] = "ignore",
    error_msg: str = "Command failed",
) -> str:
    """Executes a command in the running shell. The details of this are handled by
    the SWE-ReX deployment/runtime.

    Args:
        input: input to send to container
        timeout_duration: duration to wait for output
        check: `ignore`: do not extract exit code (more stable), `warn`: extract exit code and log error if
            exit code is non-zero, `raise`: raise error if exit code is non-zero
        error_msg: error message to raise if the command fails

    Returns:
        output: output from gym environment
    """
    self.logger.log(logging.TRACE, "Input:\n%s", input)  # type: ignore
    rex_check = "silent" if check else "ignore"
    r = asyncio.run(
        self.deployment.runtime.run_in_session(BashAction(command=input, timeout=timeout, check=rex_check))
    )
    output ='\n'.join(r.output.split('\n')[:-1])

    self.logger.log(logging.TRACE, "Output:\n%s", output)  # type: ignore
    if check != "ignore" and r.exit_code != 0:
        self.logger.error(f"{error_msg}:\n{output}")
        msg = f"Command {input!r} failed ({r.exit_code=}): {error_msg}"
        self.logger.error(msg)
        if check == "raise":
            self.close()
            raise RuntimeError(msg)
    return output

from_config classmethod

from_config(ds, bundles, config)

Create an environment instance from a configuration object. We change the creation of deployment here to support our custom deployment creation.

Attributes:
  • ds –

    a data item from dataset

  • bundles –

    list of tool bundles to be installed in the environment

Source code in SWE-agent/sweagent/environment/swe_sbenv.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@classmethod
def from_config(cls, ds,bundles,config: EnvironmentConfig) -> Self:
    """Create an environment instance from a configuration object.
    We change the creation of deployment here to support our custom deployment creation.

    Attributes:
        ds: a data item from dataset
        bundles: list of tool bundles to be installed in the environment
    """
    # Always copy config to avoid shared state between different instances
    config = config.model_copy(deep=True)
    #check class type of deployment

    return cls(
        deployment=get_deployment_from_config(config=config.deployment,ds=ds,bundles=bundles),
        repo=config.repo,
        post_startup_commands=config.post_startup_commands,
        post_startup_command_timeout=config.post_startup_command_timeout,
        name=config.name,
    )

get_patch

get_patch(dir='/res.patch')

Get patch from mini-sandbox environment

Source code in SWE-agent/sweagent/environment/swe_sbenv.py
340
341
342
def get_patch(self,dir="/res.patch"):
    """Get patch from mini-sandbox environment"""
    self.deployment.get_patch(dir)

hard_reset

hard_reset()

Resets the environment and deployment, i.e., completely restarts the deployment.

Source code in SWE-agent/sweagent/environment/swe_sbenv.py
145
146
147
148
149
150
def hard_reset(self):
    """Resets the environment and deployment, i.e., completely restarts the
    deployment.
    """
    self.close()
    self.start()

pre_check

pre_check()

This funtion is used to apply a golden patch if provided and run eval scripts right after repo is copied and reset and installed. (mainly used to check if the env is valid before running any agent steps.) Note that for swesmith data_type, the patch is applied reversely for swebench data_type, the patch is applied normally

Source code in SWE-agent/sweagent/environment/swe_sbenv.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def pre_check(self):
    """This funtion is used to apply a golden patch if provided and run eval scripts right after repo is copied and 
    reset and installed. (mainly used to check if the env is valid before running
    any agent steps.)
    Note that for swesmith data_type, the patch is applied reversely
    for swebench data_type, the patch is applied normally
    """
    # if self.deployment.eval_dir!="":
    #     path= f"{self.deployment.eval_dir}/preds.json"
    #     # load json
    #     import json
    #     with open(path, 'r') as f:
    #         preds = json.load(f)
    #     current_pred = preds.get(self.deployment.ds['instance_id'],None)
    #     if current_pred is not None:
    #         patch = current_pred.get('patch',None)
    #         if patch=='':
    #             patch = None
    #     else:
    #         patch = None
    # else:
    patch=self.deployment.ds.get('patch',None)
    test_patch=self.deployment.ds.get('test_patch',None)

    if self.deployment._config.data_type=='swesmith':
        if patch is not None:
            apply_patch(sandbox_root=self.deployment.root_dir,git_folder=self.deployment.git_folder,patch_str=patch,instance_id=self.deployment.root_dir,reverse=True)
        if test_patch is not None:
            apply_patch(sandbox_root=self.deployment.root_dir,git_folder=self.deployment.git_folder,patch_str=test_patch,instance_id=self.deployment.root_dir,reverse=False)
        self.logger.info("Applied golden patch to repository")
    elif self.deployment._config.data_type=='swebench':
        if patch is not None and patch !='':
            apply_patch(sandbox_root=self.deployment.root_dir,git_folder=self.deployment.git_folder,patch_str=patch,instance_id=self.deployment.root_dir,reverse=False)

        if test_patch is not None:
            apply_patch(sandbox_root=self.deployment.root_dir,git_folder=self.deployment.git_folder,patch_str=test_patch,instance_id=self.deployment.root_dir,reverse=False)
        self.logger.info("Applied golden patch to repository")
    reward,f2p_dic,p2p_dic,output=self._calculate_reward(p2p=0,f2p=0)
    print('reward',reward)
    self._reset_repository()
    self.deployment.reset()
    return reward,f2p_dic,p2p_dic,output

reset

reset()

Reset the environment to a clean state. Gets called by start, but can also be called independently to reset the environment to a clean state before a new attempt.

Source code in SWE-agent/sweagent/environment/swe_sbenv.py
152
153
154
155
156
157
158
159
160
161
162
163
def reset(self):
    """Reset the environment to a clean state.
    Gets called by `start`, but can also be called independently to reset the
    environment to a clean state before a new attempt.

    """
   #self.communicate(input="cd /", check="raise")

    if not self._copy_repo():
        self._reset_repository()

    self._chook.on_environment_startup()

set_env_variables

set_env_variables(env_variables)

Set environment variables in the environment.

Source code in SWE-agent/sweagent/environment/swe_sbenv.py
357
358
359
360
361
362
363
364
def set_env_variables(self, env_variables: dict[str, str]) -> None:
    """Set environment variables in the environment."""
    if not env_variables:
        self.logger.debug("No environment variables to set")
        return
    _env_setters = [f"export {k}={shlex.quote(str(v))}" for k, v in env_variables.items()]
    command = " && ".join(_env_setters)
    self.communicate(command, check="raise")

start

start()

Start the environment and reset it to a clean state. Step 1 : initialize deployment (session creation) Step 2 : reset environment (repo copy/reset) Step 3 : post init (venv preparation, tool installation, etc.)

Source code in SWE-agent/sweagent/environment/swe_sbenv.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def start(self) -> None:
    """Start the environment and reset it to a clean state.
        Step 1 : initialize deployment (session creation)
        Step 2 : reset environment (repo copy/reset)
        Step 3 : post init (venv preparation, tool installation, etc.)
    """
    Time_data={}
    time_data=self._init_deployment()
    Time_data['init_deployment']=time_data
    reset_time = time.time()
    self.reset()
    reset_end_time = time.time()
    reset_data={"reset_duration": reset_end_time - reset_time,"reset_start_time":reset_time,"reset_end_time":reset_end_time}
    Time_data['reset_deployment']=reset_data
    self._chook.on_post_init()
    post_init_time_data=self.deployment.post_init(self)

    Time_data.update(post_init_time_data)
    return Time_data

sweagent.environment.repo.GithubRepoRetryConfig

Bases: BaseModel

Configuration for cloning a GitHub repository with retry logic.

Source code in SWE-agent/sweagent/environment/repo.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
class GithubRepoRetryConfig(BaseModel):
    """Configuration for cloning a GitHub repository with retry logic."""
    git_folder: str
    """The folder name where the repository will be cloned."""

    github_url: str
    """The URL of the GitHub repository to clone."""
    base_commit: str = Field(default="HEAD")
    """The commit to reset the repository to. The default is HEAD,
    i.e., the latest commit. You can also set this to a branch name (e.g., `dev`),
    a tag (e.g., `v0.1.0`), or a commit hash (e.g., `a4464baca1f`).
    SWE-agent will then start from this commit when trying to solve the problem.
    """

    clone_timeout: float = 60
    """Timeout for git clone operation."""

    type: Literal["github"] = "github"
    """Discriminator for (de)serialization/CLI. Do not change."""

    model_config = ConfigDict(extra="forbid")

    def model_post_init(self, __context: Any) -> None:
        if self.github_url.count("/") == 1:
            self.github_url = f"https://github.com/{self.github_url}"

    @property
    def repo_name(self) -> str:
        """Set automatically based on the repository name. Cannot be set."""
        return self.git_folder

    def _get_url_with_token(self, token: str) -> str:
        """Prepend github token to URL"""
        if not token:
            return self.github_url
        if "@" in self.github_url:
            logger.warning("Cannot prepend token to URL. '@' found in URL")
            return self.github_url
        _, _, url_no_protocol = self.github_url.partition("://")
        return f"https://{token}@{url_no_protocol}"

    def copy_repo(self, deployment: AbstractDeployment):
        """Clones the repository to the sandbox."""
        base_commit = self.base_commit
        github_token = os.getenv("GITHUB_TOKEN", "")
        url = self._get_url_with_token(github_token)
        asyncio.run(
            deployment.runtime.execute(
                Command(
                    command=" && ".join(
                        (
                            f"mkdir /{self.repo_name}",
                            f"cd /{self.repo_name}",
                            "git init",
                            f"git remote add origin {url}",
                            f"git fetch --depth 1 origin {base_commit}",
                            "git checkout FETCH_HEAD",
                            "cd ..",
                        )
                    ),
                    timeout=self.clone_timeout,
                    shell=True,
                    check=True,
                )
            ),
        )

    def copy2(self, deployment: AbstractDeployment,try_count=3,local_path=None):
        """This function copies the repository to the deployment.
        It first checks if a local path is provided and exists. If so, it extracts
        the repository from the local path to the deployment. If not, it clones
        the repository from GitHub with retry logic.
        For repo matplotlib/matplotlib, it also extracts freetype and qhull tarballs to /testbed/build.
        This is due to the network limitations in our experimental environment, its original implementation is to directly download from the internet.

        Attributes:
            deployment (AbstractDeployment): The deployment to copy the repository to.
            try_count (int): The number of retry attempts for cloning the repository.
            local_path (str|None): The local path to the repository archive. If provided and exists, the repository will be extracted from this path instead of cloning from GitHub.

        """
        current_file = os.path.abspath(__file__)
        current_dir = os.path.dirname(current_file)
        zip_dir = os.path.abspath(os.path.join(current_dir, "../../../zip"))

        # if local_path exist
        if deployment._config.force_rebuild:
            if local_path is not None and os.path.exists(local_path):
                os.remove(local_path)
        if local_path is not None and os.path.exists(local_path):
            #directly copy to deployment.root_dir/deployment.git_folder


            tar_extract(local_path,os.path.join(deployment.root_dir,self.git_folder),threads=2)
            if deployment.ds['repo']=='matplotlib/matplotlib':
                # cp /home/zeta/SWE/SWE/zip/freetype-2.6.1.tar.gz to sandbox /testbed/build

                deployment.extract_freetype_tarball(f"{zip_dir}/freetype-2.6.1.tar.gz",'/testbed/build')
                deployment.extract_freetype_tarball(f"{zip_dir}/qhull-2020-src-8.0.2.tgz",'/testbed/build')
            return True


        base_commit = self.base_commit
        github_token = os.getenv("GITHUB_TOKEN", "")
        url = self._get_url_with_token(github_token)

        asyncio.run(
                deployment.runtime.run_in_session(
                    BashAction(
                        command=" && ".join(
                            (

                                'cd /',
                                f"mkdir /{self.git_folder}" ,
                                f"cd /{self.git_folder}",
                                "git init",
                                f"git remote add origin {url}"
                            )
                        ),
                        timeout=self.clone_timeout,
                        check='raise',
                    )
                ),
        )
        #impelement retry logic
        count=0
        while count<try_count:
            try:
                asyncio.run(
                    deployment.runtime.run_in_session(
                        BashAction(
                            command=" && ".join(
                                (

                                    f"git fetch --depth 1 origin {base_commit}",
                                    "git checkout FETCH_HEAD",
                                    "cd ..",
                                )
                            ),
                            timeout=self.clone_timeout,
                            check='raise',
                        )
                    ),
                )
                break
            except Exception as e:
                print("Retry copy repo")
                count+=1
                if count==try_count:
                    raise e
                logger.warning(f"Git clone failed, retrying {count}/{try_count}...")
        if deployment.ds['repo']=='matplotlib/matplotlib':
            # cp /home/zeta/SWE/SWE/zip/freetype-2.6.1.tar.gz to sandbox /testbed/build
            deployment.extract_freetype_tarball(f"{zip_dir}/freetype-2.6.1.tar.gz",'/testbed/build')
            deployment.extract_freetype_tarball(f"{zip_dir}/qhull-2020-src-8.0.2.tgz",'/testbed/build')
        return False
    def get_reset_commands(self) -> list[str]:

        return _get_git_reset_commands(self.base_commit)

base_commit class-attribute instance-attribute

base_commit = Field(default='HEAD')

The commit to reset the repository to. The default is HEAD, i.e., the latest commit. You can also set this to a branch name (e.g., dev), a tag (e.g., v0.1.0), or a commit hash (e.g., a4464baca1f). SWE-agent will then start from this commit when trying to solve the problem.

clone_timeout class-attribute instance-attribute

clone_timeout = 60

Timeout for git clone operation.

git_folder instance-attribute

git_folder

The folder name where the repository will be cloned.

github_url instance-attribute

github_url

The URL of the GitHub repository to clone.

repo_name property

repo_name

Set automatically based on the repository name. Cannot be set.

type class-attribute instance-attribute

type = 'github'

Discriminator for (de)serialization/CLI. Do not change.

copy2

copy2(deployment, try_count=3, local_path=None)

This function copies the repository to the deployment. It first checks if a local path is provided and exists. If so, it extracts the repository from the local path to the deployment. If not, it clones the repository from GitHub with retry logic. For repo matplotlib/matplotlib, it also extracts freetype and qhull tarballs to /testbed/build. This is due to the network limitations in our experimental environment, its original implementation is to directly download from the internet.

Attributes:
  • deployment (AbstractDeployment) –

    The deployment to copy the repository to.

  • try_count (int) –

    The number of retry attempts for cloning the repository.

  • local_path (str | None) –

    The local path to the repository archive. If provided and exists, the repository will be extracted from this path instead of cloning from GitHub.

Source code in SWE-agent/sweagent/environment/repo.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def copy2(self, deployment: AbstractDeployment,try_count=3,local_path=None):
    """This function copies the repository to the deployment.
    It first checks if a local path is provided and exists. If so, it extracts
    the repository from the local path to the deployment. If not, it clones
    the repository from GitHub with retry logic.
    For repo matplotlib/matplotlib, it also extracts freetype and qhull tarballs to /testbed/build.
    This is due to the network limitations in our experimental environment, its original implementation is to directly download from the internet.

    Attributes:
        deployment (AbstractDeployment): The deployment to copy the repository to.
        try_count (int): The number of retry attempts for cloning the repository.
        local_path (str|None): The local path to the repository archive. If provided and exists, the repository will be extracted from this path instead of cloning from GitHub.

    """
    current_file = os.path.abspath(__file__)
    current_dir = os.path.dirname(current_file)
    zip_dir = os.path.abspath(os.path.join(current_dir, "../../../zip"))

    # if local_path exist
    if deployment._config.force_rebuild:
        if local_path is not None and os.path.exists(local_path):
            os.remove(local_path)
    if local_path is not None and os.path.exists(local_path):
        #directly copy to deployment.root_dir/deployment.git_folder


        tar_extract(local_path,os.path.join(deployment.root_dir,self.git_folder),threads=2)
        if deployment.ds['repo']=='matplotlib/matplotlib':
            # cp /home/zeta/SWE/SWE/zip/freetype-2.6.1.tar.gz to sandbox /testbed/build

            deployment.extract_freetype_tarball(f"{zip_dir}/freetype-2.6.1.tar.gz",'/testbed/build')
            deployment.extract_freetype_tarball(f"{zip_dir}/qhull-2020-src-8.0.2.tgz",'/testbed/build')
        return True


    base_commit = self.base_commit
    github_token = os.getenv("GITHUB_TOKEN", "")
    url = self._get_url_with_token(github_token)

    asyncio.run(
            deployment.runtime.run_in_session(
                BashAction(
                    command=" && ".join(
                        (

                            'cd /',
                            f"mkdir /{self.git_folder}" ,
                            f"cd /{self.git_folder}",
                            "git init",
                            f"git remote add origin {url}"
                        )
                    ),
                    timeout=self.clone_timeout,
                    check='raise',
                )
            ),
    )
    #impelement retry logic
    count=0
    while count<try_count:
        try:
            asyncio.run(
                deployment.runtime.run_in_session(
                    BashAction(
                        command=" && ".join(
                            (

                                f"git fetch --depth 1 origin {base_commit}",
                                "git checkout FETCH_HEAD",
                                "cd ..",
                            )
                        ),
                        timeout=self.clone_timeout,
                        check='raise',
                    )
                ),
            )
            break
        except Exception as e:
            print("Retry copy repo")
            count+=1
            if count==try_count:
                raise e
            logger.warning(f"Git clone failed, retrying {count}/{try_count}...")
    if deployment.ds['repo']=='matplotlib/matplotlib':
        # cp /home/zeta/SWE/SWE/zip/freetype-2.6.1.tar.gz to sandbox /testbed/build
        deployment.extract_freetype_tarball(f"{zip_dir}/freetype-2.6.1.tar.gz",'/testbed/build')
        deployment.extract_freetype_tarball(f"{zip_dir}/qhull-2020-src-8.0.2.tgz",'/testbed/build')
    return False

copy_repo

copy_repo(deployment)

Clones the repository to the sandbox.

Source code in SWE-agent/sweagent/environment/repo.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def copy_repo(self, deployment: AbstractDeployment):
    """Clones the repository to the sandbox."""
    base_commit = self.base_commit
    github_token = os.getenv("GITHUB_TOKEN", "")
    url = self._get_url_with_token(github_token)
    asyncio.run(
        deployment.runtime.execute(
            Command(
                command=" && ".join(
                    (
                        f"mkdir /{self.repo_name}",
                        f"cd /{self.repo_name}",
                        "git init",
                        f"git remote add origin {url}",
                        f"git fetch --depth 1 origin {base_commit}",
                        "git checkout FETCH_HEAD",
                        "cd ..",
                    )
                ),
                timeout=self.clone_timeout,
                shell=True,
                check=True,
            )
        ),
    )