This documentation is for an older version (1.4.7) of Dagster. You can view the version of this page from our latest release below.
import time
from datetime import datetime
from typing import Optional
import jwt
import requests
from dagster import ConfigurableResource, resource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from pydantic import Field
def to_seconds(dt):
return (dt - datetime(1970, 1, 1)).total_seconds()
class GithubClient:
def __init__(
self, client, app_id, app_private_rsa_key, default_installation_id, hostname=None
) -> None:
self.client = client
self.app_private_rsa_key = app_private_rsa_key
self.app_id = app_id
self.default_installation_id = default_installation_id
self.installation_tokens = {}
self.app_token = {}
self.hostname = hostname
def __set_app_token(self):
# from https://developer.github.com/apps/building-github-apps/authenticating-with-github-apps/
# needing to self-sign a JWT
now = int(time.time())
# JWT expiration time (10 minute maximum)
expires = now + (10 * 60)
encoded_token = jwt.encode(
{
# issued at time
"iat": now,
# JWT expiration time
"exp": expires,
# GitHub App's identifier
"iss": self.app_id,
},
self.app_private_rsa_key,
algorithm="RS256",
)
self.app_token = {
"value": encoded_token,
"expires": expires,
}
def __check_app_token(self):
if ("expires" not in self.app_token) or (
self.app_token["expires"] < (int(time.time()) + 60)
):
self.__set_app_token()
def get_installations(self, headers=None):
if headers is None:
headers = {}
self.__check_app_token()
headers["Authorization"] = "Bearer {}".format(self.app_token["value"])
headers["Accept"] = "application/vnd.github.machine-man-preview+json"
request = self.client.get(
(
"https://api.github.com/app/installations"
if self.hostname is None
else f"https://{self.hostname}/api/v3/app/installations"
),
headers=headers,
)
request.raise_for_status()
return request.json()
def __set_installation_token(self, installation_id, headers=None):
if headers is None:
headers = {}
self.__check_app_token()
headers["Authorization"] = "Bearer {}".format(self.app_token["value"])
headers["Accept"] = "application/vnd.github.machine-man-preview+json"
request = requests.post(
(
f"https://api.github.com/app/installations/{installation_id}/access_tokens"
if self.hostname is None
else "https://{}/api/v3/app/installations/{}/access_tokens".format(
self.hostname, installation_id
)
),
headers=headers,
)
request.raise_for_status()
auth = request.json()
self.installation_tokens[installation_id] = {
"value": auth["token"],
"expires": to_seconds(datetime.strptime(auth["expires_at"], "%Y-%m-%dT%H:%M:%SZ")),
}
def __check_installation_tokens(self, installation_id):
if (installation_id not in self.installation_tokens) or (
self.installation_tokens[installation_id]["expires"] < (int(time.time()) + 60)
):
self.__set_installation_token(installation_id)
def execute(self, query, variables, headers=None, installation_id=None):
if headers is None:
headers = {}
if installation_id is None:
installation_id = self.default_installation_id
self.__check_installation_tokens(installation_id)
headers["Authorization"] = "token {}".format(
self.installation_tokens[installation_id]["value"]
)
request = requests.post(
(
"https://api.github.com/graphql"
if self.hostname is None
else f"https://{self.hostname}/api/graphql"
),
json={"query": query, "variables": variables},
headers=headers,
)
request.raise_for_status()
return request.json()
def create_issue(self, repo_name, repo_owner, title, body, installation_id=None):
if installation_id is None:
installation_id = self.default_installation_id
res = self.execute(
query="""
query get_repo_id($repo_name: String!, $repo_owner: String!) {
repository(name: $repo_name, owner: $repo_owner) {
id
}
}
""",
variables={"repo_name": repo_name, "repo_owner": repo_owner},
installation_id=installation_id,
)
return self.execute(
query="""
mutation CreateIssue($id: ID!, $title: String!, $body: String!) {
createIssue(input: {
repositoryId: $id,
title: $title,
body: $body
}) {
clientMutationId,
issue {
body
title
url
}
}
}
""",
variables={
"id": res["data"]["repository"]["id"],
"title": title,
"body": body,
},
installation_id=installation_id,
)
class GithubResource(ConfigurableResource):
github_app_id: int = Field(
description="Github Application ID, for more info see https://developer.github.com/apps/",
)
github_app_private_rsa_key: str = Field(
description=(
"Github Application Private RSA key text, for more info see"
" https://developer.github.com/apps/"
),
)
github_installation_id: Optional[int] = Field(
default=None,
description=(
"Github Application Installation ID, for more info see"
" https://developer.github.com/apps/"
),
)
github_hostname: Optional[str] = Field(
default=None,
description=(
"Github hostname. Defaults to `api.github.com`, for more info see"
" https://developer.github.com/apps/"
),
)
@classmethod
def _is_dagster_maintained(cls) -> bool:
return True
def get_client(self) -> GithubClient:
return GithubClient(
client=requests.Session(),
app_id=self.github_app_id,
app_private_rsa_key=self.github_app_private_rsa_key,
default_installation_id=self.github_installation_id,
hostname=self.github_hostname,
)
[docs]@dagster_maintained_resource
@resource(
config_schema=GithubResource.to_config_schema(),
description="This resource is for connecting to Github",
)
def github_resource(context) -> GithubClient:
return GithubResource(**context.resource_config).get_client()