Browse Source

Move code into a class. Rename p to self.log_prefix. Move matching_issue_labels to issue['lower_labels']. Rename cmd to mode.

master
poikilos 3 years ago
parent
commit
dc76e2a9bd
  1. 726
      utilities/enissue.py

726
utilities/enissue.py

@ -34,10 +34,10 @@ def decode_safe(b):
return s
cmd = None
# me = sys.argv[0]
me = os.path.basename(__file__)
cmds = {
modes = {
"list": {
"help": ("List all issues. Provide one or more labels to narrow"
" down the list. Alternatively, provide a label only."
@ -71,7 +71,7 @@ def usage():
left_w = 10
spacer = " -- "
line_fmt = "{: <" + str(left_w) + "}" + spacer + "{}"
for name, command in cmds.items():
for name, command in modes.items():
hlp = command["help"]
print(line_fmt.format(name, hlp))
if len(command["examples"]) > 0:
@ -82,341 +82,413 @@ def usage():
print("")
page = None
prev_arg = None
match_number = None
for i in range(1, len(sys.argv)):
arg = sys.argv[i]
if arg.startswith("#"):
arg = arg[1:]
if (cmd is None) and (arg in cmds.keys()):
# don't set the command to list unless the enclosing case is
# true. If a label was specified, paging is handled in the
# other case.
if arg == "page":
cmd = "list"
else:
cmd = arg
else:
try:
i = int(arg)
if prev_arg == "page":
page = i
else:
match_number = i
except ValueError:
if (cmd is None) and (cmds.get(arg) is not None):
cmd = arg
class Repo:
caches_path = "/tmp/enissue"
def __init__(
self,
remote_user = "poikilos",
repo_name = "EnlivenMinetest",
api_repo_url_fmt = "https://api.github.com/repos/{ru}/{rn}",
page_size = 30,
c_issues_name_fmt = "issues_page={p}{q}.json",
):
'''
Keyword arguments:
remote_user -- The repo user may be used in api_repo_url_fmt.
repo_name -- The repo name may be used in api_repo_url_fmt.
api_repo_url_fmt -- a format string where {ru} is where a repo
user goes (if any), and {rn} is where a
repo name goes (if any).
page_size -- This must be set to the page size that is
compatible with the URL in api_repo_url_fmt, such
as exactly 30 for GitHub.
c_issues_name_fmt -- This is the format of the URL query tail
that shows a certain page by page number,
where {p} is the page number and {q} is any
additional query such as "&label=bug".
'''
self.page = None
self.remote_user = remote_user
self.repo_name = repo_name
self.c_remote_user_path = os.path.join(Repo.caches_path,
self.remote_user)
self.c_repo_path = os.path.join(self.c_remote_user_path,
self.repo_name)
self.api_repo_url_fmt = api_repo_url_fmt
self.repo_url = self.api_repo_url_fmt.format(
ru=remote_user,
rn=repo_name
)
self.issues_url = self.repo_url + "/issues"
self.labels_url = self.repo_url + "/labels"
self.page_size = page_size
self.log_prefix = "@ "
self.c_issues_name_fmt = c_issues_name_fmt
self.label_ids = [] # all label ids in the repo
self.labels = [] # all labels in the repo
def get_issues(self):
query_s = self.issues_url
this_page = 1
query_part = ""
# TODO: IF longer API query is implemented, put something in
# query_part like "&label=Bucket_Game"
if self.page is not None:
this_page = self.page
c_issues_name = self.c_issues_name_fmt.format(p=this_page,
q=query_part)
c_issues_path = os.path.join(self.c_repo_path, c_issues_name)
p = self.log_prefix
if os.path.isfile(c_issues_path):
# See <https://stackoverflow.com/questions/7430928/
# comparing-dates-to-check-for-old-files>
max_cache_delta = timedelta(hours=12)
cache_delta = datetime.now() - max_cache_delta
c_issues_mtime = os.path.getmtime(c_issues_path)
filetime = datetime.fromtimestamp(c_issues_mtime)
if filetime > cache_delta:
print(p+"Loading cache: \"{}\"".format(c_issues_path))
# print(p+"Cache time limit: {}".format(max_cache_delta)
print(p+"Cache expires: {}".format(filetime
+ max_cache_delta))
with open(c_issues_path) as json_file:
results = json.load(json_file)
# print(p+"The cache file has"
# " {} issue(s).".format(len(results)))
max_issue = None
for issue in results:
issue_n = issue.get("number")
if issue_n is not None:
if (max_issue is None) or (issue_n > max_issue):
max_issue = issue_n
print(p+"The highest cached issue# is {}.".format(
max_issue
))
return results
else:
if arg != "page":
# print("* adding criteria: {}".format(arg))
cmd = "list"
match_all_labels.append(arg)
prev_arg = arg
if cmd is None:
if len(match_all_labels) > 1:
cmd = "list"
if match_number is not None:
cmd = "issue"
valid_cmds = ["issue"]
for k, v in cmds.items():
valid_cmds.append(k)
if cmd is None:
print()
print()
usage()
print()
print()
exit(0)
elif cmd not in valid_cmds:
print()
print()
usage()
print()
print(cmd + " is not a valid command.")
print()
print()
exit(0)
print("")
# print("Loading...")
caches_path = "/tmp/enissue"
remote_user = "poikilos"
repo_name = "EnlivenMinetest"
c_remote_user_path = os.path.join(caches_path, remote_user)
c_repo_path = os.path.join(c_remote_user_path, repo_name)
api_repo_url_fmt = "https://api.github.com/repos/{}/{}"
repo_url = api_repo_url_fmt.format(remote_user, repo_name)
issues_url = repo_url + "/issues"
labels_url = repo_url + "/labels"
page_size = 30 # The GitHub API only lists 30 issues per page.
p = "@ "
def get_issues():
query_s = issues_url
c_issues_name_fmt = "issues_page={}{}.json"
this_page = 1
query_part = ""
# TODO: IF longer API query is implemented, put something in
# query_part like "&label=Bucket_Game"
if page is not None:
this_page = page
c_issues_name = c_issues_name_fmt.format(this_page, query_part)
c_issues_path = os.path.join(c_repo_path, c_issues_name)
if os.path.isfile(c_issues_path):
# See <https://stackoverflow.com/questions/7430928/
# comparing-dates-to-check-for-old-files>
max_cache_delta = timedelta(hours=12)
cache_delta = datetime.now() - max_cache_delta
c_issues_mtime = os.path.getmtime(c_issues_path)
filetime = datetime.fromtimestamp(c_issues_mtime)
if filetime > cache_delta:
print(p+"Loading cache: \"{}\"".format(c_issues_path))
# print(p+"Cache time limit: {}".format(max_cache_delta)
print(p+"Cache expires: {}".format(filetime
+ max_cache_delta))
with open(c_issues_path) as json_file:
results = json.load(json_file)
# print(p+"The cache file has"
# " {} issue(s).".format(len(results)))
max_issue = None
for issue in results:
issue_n = issue.get("number")
if issue_n is not None:
if (max_issue is None) or (issue_n > max_issue):
max_issue = issue_n
print(p+"The highest cached issue# is {}.".format(max_issue))
return results
print(p+"Cache time limit: {}".format(max_cache_delta))
print(p+"The cache has expired: \"{}\"".format(
c_issues_path
))
else:
print(p+"Cache time limit: {}".format(max_cache_delta))
print(p+"The cache has expired: \"{}\"".format(c_issues_path))
else:
print(p+"There is no cache for \"{}\"".format(c_issues_path))
print(p+"There is no cache for \"{}\"".format(
c_issues_path
))
if page is not None:
query_s = issues_url + "?page=" + str(page)
try:
response = request.urlopen(query_s)
except urllib.error.HTTPError as e:
print(p+"You may be able to view the issues on GitHub")
print(p+"at the 'html_url', and a login may be required.")
print(p+"The URL \"{}\" is not accessible, so you may have"
" exceeded the rate limit and be blocked"
" temporarily:".format(query_s))
print(str(e))
return None
response_s = decode_safe(response.read())
if not os.path.isdir(c_repo_path):
os.makedirs(c_repo_path)
print(p+"Saving issues cache: {}".format(c_issues_path))
with open(c_issues_path, "w") as outs:
outs.write(response_s)
results = json.loads(response_s)
return results
# TODO: get labels another way, and make this conditional:
# if cmd == "list":
issues = get_issues()
if issues is None:
exit(0)
label_ids = []
labels = []
match_all_labels_lower = []
for s in match_all_labels:
# print(p+"appending"
# " {} to match_all_labels_lower.".format(s.lower()))
match_all_labels_lower.append(s.lower())
match_count = 0
total_count = len(issues)
matching_issue = None
matching_issue_labels = None
# TODO: get labels another way, and make this conditional:
# if cmd == "list":
for issue in issues:
this_issue_labels_lower = []
for label in issue["labels"]:
label_ids.append(label["id"])
if label["url"].startswith(labels_url):
start = len(labels_url) + 1 # +1 for "/"
label_encoded = label["url"][start:]
label_s = unquote(label_encoded)
this_issue_labels_lower.append(label_s.lower())
if label_s not in labels:
labels.append(label_s)
else:
raise ValueError(p+"ERROR: The url '{}' does not start with"
" '{}'".format(label["url"], labels_url))
if len(match_all_labels) > 0:
this_issue_match_count = 0
for try_label in match_all_labels_lower:
if try_label in this_issue_labels_lower:
this_issue_match_count += 1
# else:
# print(p+"{} is not a match ({} is not in"
# " {})".format(issue["number"], try_label,
# this_issue_labels_lower))
if this_issue_match_count == len(match_all_labels):
match_count += 1
print("#{} {}".format(issue["number"], issue["title"]))
elif (match_number is None) and (cmd == "list"):
# Show all since no criteria is set.
match_count += 1
if self.page is not None:
query_s = self.issues_url + "?page=" + str(self.page)
try:
response = request.urlopen(query_s)
except urllib.error.HTTPError as e:
print(p+"You may be able to view the issues on GitHub")
print(p+"at the 'html_url', and a login may be required.")
print(p+"The URL \"{}\" is not accessible, so you may have"
" exceeded the rate limit and be blocked"
" temporarily:".format(query_s))
print(str(e))
return None
response_s = decode_safe(response.read())
if not os.path.isdir(self.c_repo_path):
os.makedirs(self.c_repo_path)
print(p+"Saving issues cache: {}".format(c_issues_path))
with open(c_issues_path, "w") as outs:
outs.write(response_s)
results = json.loads(response_s)
return results
def show_issue(self, issue):
p = self.log_prefix
print("")
print("#{} {}".format(issue["number"], issue["title"]))
if match_number is not None:
# INFO: match_number & issue["number"] are ints
if match_number == issue["number"]:
matching_issue = issue
matching_issue_labels = this_issue_labels_lower
issue = None # prevent value from leaking
def show_issue(issue):
print("")
print("#{} {}".format(issue["number"], issue["title"]))
# print(issue["html_url"])
print("")
this_issue_json_url = issue["url"]
issue_data_bytes = None
try:
response = request.urlopen(this_issue_json_url)
issue_data_bytes = response.read()
except urllib.error.HTTPError as e:
print(str(e))
print(p+"The URL \"{}\" is not accessible, so you may have"
" exceeded the rate limit and be blocked"
" temporarily:".format(this_issue_json_url))
html_url = issue.get("html_url")
print(p+"You may be able to view the issue on GitHub")
print(p+"at the 'html_url', and a login may be required:")
print(p+"html_url: {}".format(html_url))
return False
issue_data_s = decode_safe(issue_data_bytes)
issue_data = json.loads(issue_data_s)
markdown = issue_data.get("body")
# ^ It is (always?) present but allowed to be None by GitHub!
if markdown is not None:
markdown = markdown.replace("\\r\\n", "\n").replace("\\t", "\t")
left_w = 10
spacer = " "
line_fmt = "{: <" + str(left_w) + "}" + spacer + "{}"
print(line_fmt.format("html_url:", issue["html_url"]))
print(line_fmt.format("by:", issue_data["user"]["login"]))
print(line_fmt.format("state:", issue_data["state"]))
assignees = issue_data.get("assignees")
if (assignees is not None) and len(assignees) > 1:
assignee_names = [a["login"] for a in assignees]
print(line_fmt.format("assignees:", " ".join(assignee_names)))
elif issue_data.get("assignee") is not None:
assignee_name = issue_data["assignee"]["login"]
print(line_fmt.format("assignee:", assignee_name))
labels_s = "None"
if len(matching_issue_labels) > 0:
neat_labels = []
for label_s in matching_issue_labels:
if " " in label_s:
neat_labels.append('"' + label_s + '"')
else:
neat_labels.append(label_s)
labels_s = ", ".join(neat_labels)
print(line_fmt.format("labels:", labels_s))
print("")
if markdown is not None:
print('"""')
print(markdown)
print('"""')
else:
print("(no description)")
if issue_data["comments"] > 0:
# print(issue["html_url"])
print("")
this_issue_json_url = issue["url"]
issue_data_bytes = None
try:
response = request.urlopen(this_issue_json_url)
issue_data_bytes = response.read()
except urllib.error.HTTPError as e:
print(str(e))
print(p+"The URL \"{}\" is not accessible, so you may have"
" exceeded the rate limit and be blocked"
" temporarily:".format(this_issue_json_url))
html_url = issue.get("html_url")
print(p+"You may be able to view the issue on GitHub")
print(p+"at the 'html_url', and a login may be required:")
print(p+"html_url: {}".format(html_url))
return False
issue_data_s = decode_safe(issue_data_bytes)
issue_data = json.loads(issue_data_s)
markdown = issue_data.get("body")
# ^ It is (always?) present but allowed to be None by GitHub!
if markdown is not None:
markdown = markdown.replace("\\r\\n", "\n").replace(
"\\t",
"\t"
)
left_w = 10
spacer = " "
line_fmt = "{: <" + str(left_w) + "}" + spacer + "{}"
print(line_fmt.format("html_url:", issue["html_url"]))
print(line_fmt.format("by:", issue_data["user"]["login"]))
print(line_fmt.format("state:", issue_data["state"]))
assignees = issue_data.get("assignees")
if (assignees is not None) and len(assignees) > 1:
assignee_names = [a["login"] for a in assignees]
print(line_fmt.format("assignees:",
" ".join(assignee_names)))
elif issue_data.get("assignee") is not None:
assignee_name = issue_data["assignee"]["login"]
print(line_fmt.format("assignee:", assignee_name))
labels_s = "None"
if len(issue['lower_labels']) > 0:
neat_labels = []
for label_s in issue['lower_labels']:
if " " in label_s:
neat_labels.append('"' + label_s + '"')
else:
neat_labels.append(label_s)
labels_s = ", ".join(neat_labels)
print(line_fmt.format("labels:", labels_s))
print("")
print("({}) comment(s):".format(issue_data["comments"]))
this_cmts_json_url = issue_data["comments_url"]
response = request.urlopen(this_cmts_json_url)
cmts_data_s = decode_safe(response.read())
cmts_data = json.loads(cmts_data_s)
left_margin = " "
c_prop_fmt = (left_margin + "{: <" + str(left_w) + "}" + spacer
+ "{}")
for cmt in cmts_data:
print("")
print("")
print(c_prop_fmt.format("from:", cmt["user"]["login"]))
print(c_prop_fmt.format("updated_at:", cmt["updated_at"]))
if markdown is not None:
print('"""')
print(markdown)
print('"""')
else:
print("(no description)")
if issue_data["comments"] > 0:
print("")
print(left_margin + '"""')
print(left_margin + cmt["body"])
print(left_margin + '"""')
print("")
print("({}) comment(s):".format(issue_data["comments"]))
this_cmts_json_url = issue_data["comments_url"]
response = request.urlopen(this_cmts_json_url)
cmts_data_s = decode_safe(response.read())
cmts_data = json.loads(cmts_data_s)
left_margin = " "
c_prop_fmt = (left_margin + "{: <" + str(left_w) + "}"
+ spacer + "{}")
for cmt in cmts_data:
print("")
print("")
print(c_prop_fmt.format("from:", cmt["user"]["login"]))
print(c_prop_fmt.format("updated_at:",
cmt["updated_at"]))
print("")
print(left_margin + '"""')
print(left_margin + cmt["body"])
print(left_margin + '"""')
print("")
print("")
print("")
return True
def load_issues(self):
self.issues = self.get_issues()
def get_match(self, mode, match_number=None, match_all_labels_lower=[]):
'''
Sequential arguments:
mode -- This must be a valid mode
(a key in the modes dictionary).
Keyword arguments:
match_number -- Match this issue number (None for multiple).
match_all_labels_lower -- Only match where all of these labels
are on the issue.
'''
matching_issue = None
match_count = 0
# TODO: get labels another way, and make this conditional:
# if mode == "list":
for issue in self.issues:
this_issue_labels_lower = []
for label in issue["labels"]:
self.label_ids.append(label["id"])
if label["url"].startswith(self.labels_url):
start = len(self.labels_url) + 1 # +1 for "/"
label_encoded = label["url"][start:]
label_s = unquote(label_encoded)
this_issue_labels_lower.append(label_s.lower())
if label_s not in self.labels:
self.labels.append(label_s)
else:
raise ValueError(p+"ERROR: The url '{}' does not"
" start with '{}'"
"".format(label["url"],
self.labels_url))
if len(match_all_labels) > 0:
this_issue_match_count = 0
for try_label in match_all_labels_lower:
if try_label in this_issue_labels_lower:
this_issue_match_count += 1
# else:
# print(p+"{} is not a match ({} is not in"
# " {})".format(issue["number"], try_label,
# this_issue_labels_lower))
if this_issue_match_count == len(match_all_labels):
match_count += 1
print("#{} {}".format(issue["number"], issue["title"]))
elif (match_number is None) and (mode == "list"):
# Show all since no criteria is set.
match_count += 1
print("#{} {}".format(issue["number"], issue["title"]))
if match_number is not None:
# INFO: match_number & issue["number"] are ints
if match_number == issue["number"]:
matching_issue = issue
issue['lower_labels'] = this_issue_labels_lower
return {'issue':matching_issue, 'count':match_count}
def main():
mode = None
repo = Repo()
prev_arg = None
match_number = None
for i in range(1, len(sys.argv)):
arg = sys.argv[i]
if arg.startswith("#"):
arg = arg[1:]
if (mode is None) and (arg in modes.keys()):
# don't set the command to list unless the enclosing case is
# true. If a label was specified, paging is handled in the
# other case.
if arg == "page":
mode = "list"
else:
mode = arg
else:
try:
i = int(arg)
if prev_arg == "page":
repo.page = i
else:
match_number = i
except ValueError:
if (mode is None) and (modes.get(arg) is not None):
mode = arg
else:
if arg != "page":
# print("* adding criteria: {}".format(arg))
mode = "list"
match_all_labels.append(arg)
prev_arg = arg
if mode is None:
if len(match_all_labels) > 1:
mode = "list"
if match_number is not None:
mode = "issue"
valid_modes = ["issue"]
for k, v in modes.items():
valid_modes.append(k)
if mode is None:
print()
print()
usage()
print()
print()
sys.exit(0)
elif mode not in valid_modes:
print()
print()
usage()
print()
print(mode + " is not a valid command.")
print()
print()
sys.exit(0)
print("")
print("")
return True
# print("Loading...")
# TODO: get labels another way, and make this conditional:
# if mode == "list":
repo.load_issues()
if repo.issues is None:
print("There were no issues.")
sys.exit(0)
match_all_labels_lower = []
p = repo.log_prefix
for s in match_all_labels:
# print(p+"appending"
# " {} to match_all_labels_lower.".format(s.lower()))
match_all_labels_lower.append(s.lower())
total_count = len(repo.issues)
match = repo.get_match(
mode,
match_number=match_number,
match_all_labels_lower=match_all_labels_lower,
)
matching_issue = match['issue']
if matching_issue is not None:
repo.show_issue(matching_issue)
if mode == "labels":
# print("Labels:")
# print("")
for label_s in repo.labels:
print(label_s)
print("")
print("The repo has {} label(s).".format(len(repo.labels)))
print("")
if total_count >= repo.page_size:
print("The maximum issue count per page was reached.")
next_page = 2
if repo.page is not None:
next_page = repo.page + 1
print(" ./" + me + " labels page " + str(next_page))
print("to see labels on additional pages.")
if matching_issue is not None:
show_issue(matching_issue)
elif mode == "list":
print()
if len(match_all_labels) > 0:
print("{} issue(s) matched {}".format(
match['count'],
" + ".join("'{}'".format(s) for s in match_all_labels)
))
if total_count >= repo.page_size:
print("{} searched, which is the maximum number"
" per page.".format(total_count))
next_page = 2
if repo.page is not None:
next_page = repo.page + 1
print(" ./" + me + " " + " ".join(match_all_labels)
+ " page " + str(next_page))
print("to see additional pages.")
if cmd == "labels":
# print("Labels:")
# print("")
for label_s in labels:
print(label_s)
print("")
print("The repo has {} label(s).".format(len(labels)))
print("")
if total_count >= page_size:
print("The maximum issue count per page was reached.")
next_page = 2
if page is not None:
next_page = page + 1
print(" ./" + me + " labels page " + str(next_page))
print("to see labels on additional pages.")
elif cmd == "list":
print()
if len(match_all_labels) > 0:
print("{} issue(s) matched {}".format(
match_count,
" + ".join("'{}'".format(s) for s in match_all_labels)
))
if total_count >= page_size:
print("{} searched, which is the maximum number"
" per page.".format(total_count))
next_page = 2
if page is not None:
next_page = page + 1
print(" ./" + me + " " + " ".join(match_all_labels)
+ " page " + str(next_page))
print("to see additional pages.")
else:
if page is not None:
print("{} issue(s) are showing for page"
" {}.".format(match_count, page))
else:
print("{} issue(s) are showing.".format(match_count))
if total_count >= page_size:
print("That is the maximum number per page. Type")
next_page = 2
if page is not None:
next_page = page + 1
print(" ./" + me + " page " + str(next_page))
print("to see additional pages.")
if match_count > 0:
print()
print()
print("To view details, type")
print(" ./" + me)
print("followed by a number.")
print("")
if repo.page is not None:
print("{} issue(s) are showing for page"
" {}.".format(match['count'], repo.page))
else:
print("{} issue(s) are showing.".format(match['count']))
if total_count >= repo.page_size:
print("That is the maximum number per page. Type")
next_page = 2
if repo.page is not None:
next_page = repo.page + 1
print(" ./" + me + " page " + str(next_page))
print("to see additional pages.")
if match['count'] > 0:
print()
print()
print("To view details, type")
print(" ./" + me)
print("followed by a number.")
print("")
if __name__ == "__main__":
main()

Loading…
Cancel
Save