diff --git a/utilities/enissue.py b/utilities/enissue.py index f2b8d71..6e20f94 100755 --- a/utilities/enissue.py +++ b/utilities/enissue.py @@ -34,10 +34,10 @@ def decode_safe(b): return s -cmd = None + # me = sys.argv[0] me = os.path.basename(__file__) -cmds = { +modes = { "list": { "help": ("List all issues. Provide one or more labels to narrow" " down the list. Alternatively, provide a label only." @@ -71,7 +71,7 @@ def usage(): left_w = 10 spacer = " -- " line_fmt = "{: <" + str(left_w) + "}" + spacer + "{}" - for name, command in cmds.items(): + for name, command in modes.items(): hlp = command["help"] print(line_fmt.format(name, hlp)) if len(command["examples"]) > 0: @@ -82,341 +82,413 @@ def usage(): print("") -page = None -prev_arg = None -match_number = None -for i in range(1, len(sys.argv)): - arg = sys.argv[i] - if arg.startswith("#"): - arg = arg[1:] - if (cmd is None) and (arg in cmds.keys()): - # don't set the command to list unless the enclosing case is - # true. If a label was specified, paging is handled in the - # other case. - if arg == "page": - cmd = "list" - else: - cmd = arg - else: - try: - i = int(arg) - if prev_arg == "page": - page = i - else: - match_number = i - except ValueError: - if (cmd is None) and (cmds.get(arg) is not None): - cmd = arg +class Repo: + + caches_path = "/tmp/enissue" + + def __init__( + self, + remote_user = "poikilos", + repo_name = "EnlivenMinetest", + api_repo_url_fmt = "https://api.github.com/repos/{ru}/{rn}", + page_size = 30, + c_issues_name_fmt = "issues_page={p}{q}.json", + ): + ''' + Keyword arguments: + remote_user -- The repo user may be used in api_repo_url_fmt. + repo_name -- The repo name may be used in api_repo_url_fmt. + api_repo_url_fmt -- a format string where {ru} is where a repo + user goes (if any), and {rn} is where a + repo name goes (if any). + page_size -- This must be set to the page size that is + compatible with the URL in api_repo_url_fmt, such + as exactly 30 for GitHub. + c_issues_name_fmt -- This is the format of the URL query tail + that shows a certain page by page number, + where {p} is the page number and {q} is any + additional query such as "&label=bug". + ''' + self.page = None + self.remote_user = remote_user + self.repo_name = repo_name + self.c_remote_user_path = os.path.join(Repo.caches_path, + self.remote_user) + self.c_repo_path = os.path.join(self.c_remote_user_path, + self.repo_name) + self.api_repo_url_fmt = api_repo_url_fmt + self.repo_url = self.api_repo_url_fmt.format( + ru=remote_user, + rn=repo_name + ) + self.issues_url = self.repo_url + "/issues" + self.labels_url = self.repo_url + "/labels" + self.page_size = page_size + self.log_prefix = "@ " + self.c_issues_name_fmt = c_issues_name_fmt + + self.label_ids = [] # all label ids in the repo + self.labels = [] # all labels in the repo + + def get_issues(self): + query_s = self.issues_url + this_page = 1 + + query_part = "" + # TODO: IF longer API query is implemented, put something in + # query_part like "&label=Bucket_Game" + if self.page is not None: + this_page = self.page + c_issues_name = self.c_issues_name_fmt.format(p=this_page, + q=query_part) + c_issues_path = os.path.join(self.c_repo_path, c_issues_name) + p = self.log_prefix + if os.path.isfile(c_issues_path): + # See + max_cache_delta = timedelta(hours=12) + cache_delta = datetime.now() - max_cache_delta + c_issues_mtime = os.path.getmtime(c_issues_path) + filetime = datetime.fromtimestamp(c_issues_mtime) + if filetime > cache_delta: + print(p+"Loading cache: \"{}\"".format(c_issues_path)) + # print(p+"Cache time limit: {}".format(max_cache_delta) + print(p+"Cache expires: {}".format(filetime + + max_cache_delta)) + with open(c_issues_path) as json_file: + results = json.load(json_file) + # print(p+"The cache file has" + # " {} issue(s).".format(len(results))) + max_issue = None + for issue in results: + issue_n = issue.get("number") + if issue_n is not None: + if (max_issue is None) or (issue_n > max_issue): + max_issue = issue_n + print(p+"The highest cached issue# is {}.".format( + max_issue + )) + return results else: - if arg != "page": - # print("* adding criteria: {}".format(arg)) - cmd = "list" - match_all_labels.append(arg) - prev_arg = arg -if cmd is None: - if len(match_all_labels) > 1: - cmd = "list" - if match_number is not None: - cmd = "issue" -valid_cmds = ["issue"] -for k, v in cmds.items(): - valid_cmds.append(k) - -if cmd is None: - print() - print() - usage() - print() - print() - exit(0) -elif cmd not in valid_cmds: - print() - print() - usage() - print() - print(cmd + " is not a valid command.") - print() - print() - exit(0) - -print("") -# print("Loading...") - - -caches_path = "/tmp/enissue" -remote_user = "poikilos" -repo_name = "EnlivenMinetest" -c_remote_user_path = os.path.join(caches_path, remote_user) -c_repo_path = os.path.join(c_remote_user_path, repo_name) -api_repo_url_fmt = "https://api.github.com/repos/{}/{}" -repo_url = api_repo_url_fmt.format(remote_user, repo_name) -issues_url = repo_url + "/issues" -labels_url = repo_url + "/labels" -page_size = 30 # The GitHub API only lists 30 issues per page. -p = "@ " - - -def get_issues(): - query_s = issues_url - c_issues_name_fmt = "issues_page={}{}.json" - this_page = 1 - - query_part = "" - # TODO: IF longer API query is implemented, put something in - # query_part like "&label=Bucket_Game" - if page is not None: - this_page = page - c_issues_name = c_issues_name_fmt.format(this_page, query_part) - c_issues_path = os.path.join(c_repo_path, c_issues_name) - if os.path.isfile(c_issues_path): - # See - max_cache_delta = timedelta(hours=12) - cache_delta = datetime.now() - max_cache_delta - c_issues_mtime = os.path.getmtime(c_issues_path) - filetime = datetime.fromtimestamp(c_issues_mtime) - if filetime > cache_delta: - print(p+"Loading cache: \"{}\"".format(c_issues_path)) - # print(p+"Cache time limit: {}".format(max_cache_delta) - print(p+"Cache expires: {}".format(filetime - + max_cache_delta)) - with open(c_issues_path) as json_file: - results = json.load(json_file) - # print(p+"The cache file has" - # " {} issue(s).".format(len(results))) - max_issue = None - for issue in results: - issue_n = issue.get("number") - if issue_n is not None: - if (max_issue is None) or (issue_n > max_issue): - max_issue = issue_n - print(p+"The highest cached issue# is {}.".format(max_issue)) - return results + print(p+"Cache time limit: {}".format(max_cache_delta)) + print(p+"The cache has expired: \"{}\"".format( + c_issues_path + )) else: - print(p+"Cache time limit: {}".format(max_cache_delta)) - print(p+"The cache has expired: \"{}\"".format(c_issues_path)) - else: - print(p+"There is no cache for \"{}\"".format(c_issues_path)) + print(p+"There is no cache for \"{}\"".format( + c_issues_path + )) - if page is not None: - query_s = issues_url + "?page=" + str(page) - try: - response = request.urlopen(query_s) - except urllib.error.HTTPError as e: - print(p+"You may be able to view the issues on GitHub") - print(p+"at the 'html_url', and a login may be required.") - print(p+"The URL \"{}\" is not accessible, so you may have" - " exceeded the rate limit and be blocked" - " temporarily:".format(query_s)) - print(str(e)) - return None - response_s = decode_safe(response.read()) - if not os.path.isdir(c_repo_path): - os.makedirs(c_repo_path) - print(p+"Saving issues cache: {}".format(c_issues_path)) - with open(c_issues_path, "w") as outs: - outs.write(response_s) - results = json.loads(response_s) - return results - - -# TODO: get labels another way, and make this conditional: -# if cmd == "list": -issues = get_issues() -if issues is None: - exit(0) -label_ids = [] -labels = [] - -match_all_labels_lower = [] -for s in match_all_labels: - # print(p+"appending" - # " {} to match_all_labels_lower.".format(s.lower())) - match_all_labels_lower.append(s.lower()) - -match_count = 0 -total_count = len(issues) -matching_issue = None - -matching_issue_labels = None - -# TODO: get labels another way, and make this conditional: -# if cmd == "list": -for issue in issues: - this_issue_labels_lower = [] - for label in issue["labels"]: - label_ids.append(label["id"]) - if label["url"].startswith(labels_url): - start = len(labels_url) + 1 # +1 for "/" - label_encoded = label["url"][start:] - label_s = unquote(label_encoded) - this_issue_labels_lower.append(label_s.lower()) - if label_s not in labels: - labels.append(label_s) - else: - raise ValueError(p+"ERROR: The url '{}' does not start with" - " '{}'".format(label["url"], labels_url)) - if len(match_all_labels) > 0: - this_issue_match_count = 0 - for try_label in match_all_labels_lower: - if try_label in this_issue_labels_lower: - this_issue_match_count += 1 - # else: - # print(p+"{} is not a match ({} is not in" - # " {})".format(issue["number"], try_label, - # this_issue_labels_lower)) - if this_issue_match_count == len(match_all_labels): - match_count += 1 - print("#{} {}".format(issue["number"], issue["title"])) - elif (match_number is None) and (cmd == "list"): - # Show all since no criteria is set. - match_count += 1 + if self.page is not None: + query_s = self.issues_url + "?page=" + str(self.page) + try: + response = request.urlopen(query_s) + except urllib.error.HTTPError as e: + print(p+"You may be able to view the issues on GitHub") + print(p+"at the 'html_url', and a login may be required.") + print(p+"The URL \"{}\" is not accessible, so you may have" + " exceeded the rate limit and be blocked" + " temporarily:".format(query_s)) + print(str(e)) + return None + response_s = decode_safe(response.read()) + if not os.path.isdir(self.c_repo_path): + os.makedirs(self.c_repo_path) + print(p+"Saving issues cache: {}".format(c_issues_path)) + with open(c_issues_path, "w") as outs: + outs.write(response_s) + results = json.loads(response_s) + return results + + + def show_issue(self, issue): + p = self.log_prefix + print("") print("#{} {}".format(issue["number"], issue["title"])) - if match_number is not None: - # INFO: match_number & issue["number"] are ints - if match_number == issue["number"]: - matching_issue = issue - matching_issue_labels = this_issue_labels_lower -issue = None # prevent value from leaking - - -def show_issue(issue): - print("") - print("#{} {}".format(issue["number"], issue["title"])) - # print(issue["html_url"]) - print("") - this_issue_json_url = issue["url"] - issue_data_bytes = None - try: - response = request.urlopen(this_issue_json_url) - issue_data_bytes = response.read() - except urllib.error.HTTPError as e: - print(str(e)) - print(p+"The URL \"{}\" is not accessible, so you may have" - " exceeded the rate limit and be blocked" - " temporarily:".format(this_issue_json_url)) - html_url = issue.get("html_url") - print(p+"You may be able to view the issue on GitHub") - print(p+"at the 'html_url', and a login may be required:") - print(p+"html_url: {}".format(html_url)) - return False - issue_data_s = decode_safe(issue_data_bytes) - issue_data = json.loads(issue_data_s) - markdown = issue_data.get("body") - # ^ It is (always?) present but allowed to be None by GitHub! - if markdown is not None: - markdown = markdown.replace("\\r\\n", "\n").replace("\\t", "\t") - left_w = 10 - spacer = " " - line_fmt = "{: <" + str(left_w) + "}" + spacer + "{}" - print(line_fmt.format("html_url:", issue["html_url"])) - print(line_fmt.format("by:", issue_data["user"]["login"])) - print(line_fmt.format("state:", issue_data["state"])) - assignees = issue_data.get("assignees") - if (assignees is not None) and len(assignees) > 1: - assignee_names = [a["login"] for a in assignees] - print(line_fmt.format("assignees:", " ".join(assignee_names))) - elif issue_data.get("assignee") is not None: - assignee_name = issue_data["assignee"]["login"] - print(line_fmt.format("assignee:", assignee_name)) - labels_s = "None" - if len(matching_issue_labels) > 0: - neat_labels = [] - for label_s in matching_issue_labels: - if " " in label_s: - neat_labels.append('"' + label_s + '"') - else: - neat_labels.append(label_s) - labels_s = ", ".join(neat_labels) - print(line_fmt.format("labels:", labels_s)) - print("") - if markdown is not None: - print('"""') - print(markdown) - print('"""') - else: - print("(no description)") - if issue_data["comments"] > 0: + # print(issue["html_url"]) print("") + this_issue_json_url = issue["url"] + issue_data_bytes = None + try: + response = request.urlopen(this_issue_json_url) + issue_data_bytes = response.read() + except urllib.error.HTTPError as e: + print(str(e)) + print(p+"The URL \"{}\" is not accessible, so you may have" + " exceeded the rate limit and be blocked" + " temporarily:".format(this_issue_json_url)) + html_url = issue.get("html_url") + print(p+"You may be able to view the issue on GitHub") + print(p+"at the 'html_url', and a login may be required:") + print(p+"html_url: {}".format(html_url)) + return False + issue_data_s = decode_safe(issue_data_bytes) + issue_data = json.loads(issue_data_s) + markdown = issue_data.get("body") + # ^ It is (always?) present but allowed to be None by GitHub! + if markdown is not None: + markdown = markdown.replace("\\r\\n", "\n").replace( + "\\t", + "\t" + ) + left_w = 10 + spacer = " " + line_fmt = "{: <" + str(left_w) + "}" + spacer + "{}" + print(line_fmt.format("html_url:", issue["html_url"])) + print(line_fmt.format("by:", issue_data["user"]["login"])) + print(line_fmt.format("state:", issue_data["state"])) + assignees = issue_data.get("assignees") + if (assignees is not None) and len(assignees) > 1: + assignee_names = [a["login"] for a in assignees] + print(line_fmt.format("assignees:", + " ".join(assignee_names))) + elif issue_data.get("assignee") is not None: + assignee_name = issue_data["assignee"]["login"] + print(line_fmt.format("assignee:", assignee_name)) + labels_s = "None" + if len(issue['lower_labels']) > 0: + neat_labels = [] + for label_s in issue['lower_labels']: + if " " in label_s: + neat_labels.append('"' + label_s + '"') + else: + neat_labels.append(label_s) + labels_s = ", ".join(neat_labels) + print(line_fmt.format("labels:", labels_s)) print("") - print("({}) comment(s):".format(issue_data["comments"])) - this_cmts_json_url = issue_data["comments_url"] - response = request.urlopen(this_cmts_json_url) - cmts_data_s = decode_safe(response.read()) - cmts_data = json.loads(cmts_data_s) - left_margin = " " - c_prop_fmt = (left_margin + "{: <" + str(left_w) + "}" + spacer - + "{}") - for cmt in cmts_data: - print("") - print("") - print(c_prop_fmt.format("from:", cmt["user"]["login"])) - print(c_prop_fmt.format("updated_at:", cmt["updated_at"])) + if markdown is not None: + print('"""') + print(markdown) + print('"""') + else: + print("(no description)") + if issue_data["comments"] > 0: print("") - print(left_margin + '"""') - print(left_margin + cmt["body"]) - print(left_margin + '"""') print("") + print("({}) comment(s):".format(issue_data["comments"])) + this_cmts_json_url = issue_data["comments_url"] + response = request.urlopen(this_cmts_json_url) + cmts_data_s = decode_safe(response.read()) + cmts_data = json.loads(cmts_data_s) + left_margin = " " + c_prop_fmt = (left_margin + "{: <" + str(left_w) + "}" + + spacer + "{}") + for cmt in cmts_data: + print("") + print("") + print(c_prop_fmt.format("from:", cmt["user"]["login"])) + print(c_prop_fmt.format("updated_at:", + cmt["updated_at"])) + print("") + print(left_margin + '"""') + print(left_margin + cmt["body"]) + print(left_margin + '"""') + print("") + + print("") + print("") + return True + + def load_issues(self): + self.issues = self.get_issues() + + def get_match(self, mode, match_number=None, match_all_labels_lower=[]): + ''' + Sequential arguments: + mode -- This must be a valid mode + (a key in the modes dictionary). + + Keyword arguments: + match_number -- Match this issue number (None for multiple). + match_all_labels_lower -- Only match where all of these labels + are on the issue. + ''' + matching_issue = None + match_count = 0 + # TODO: get labels another way, and make this conditional: + # if mode == "list": + for issue in self.issues: + this_issue_labels_lower = [] + for label in issue["labels"]: + self.label_ids.append(label["id"]) + if label["url"].startswith(self.labels_url): + start = len(self.labels_url) + 1 # +1 for "/" + label_encoded = label["url"][start:] + label_s = unquote(label_encoded) + this_issue_labels_lower.append(label_s.lower()) + if label_s not in self.labels: + self.labels.append(label_s) + else: + raise ValueError(p+"ERROR: The url '{}' does not" + " start with '{}'" + "".format(label["url"], + self.labels_url)) + if len(match_all_labels) > 0: + this_issue_match_count = 0 + for try_label in match_all_labels_lower: + if try_label in this_issue_labels_lower: + this_issue_match_count += 1 + # else: + # print(p+"{} is not a match ({} is not in" + # " {})".format(issue["number"], try_label, + # this_issue_labels_lower)) + if this_issue_match_count == len(match_all_labels): + match_count += 1 + print("#{} {}".format(issue["number"], issue["title"])) + elif (match_number is None) and (mode == "list"): + # Show all since no criteria is set. + match_count += 1 + print("#{} {}".format(issue["number"], issue["title"])) + if match_number is not None: + # INFO: match_number & issue["number"] are ints + if match_number == issue["number"]: + matching_issue = issue + issue['lower_labels'] = this_issue_labels_lower + return {'issue':matching_issue, 'count':match_count} + + +def main(): + mode = None + repo = Repo() + prev_arg = None + match_number = None + for i in range(1, len(sys.argv)): + arg = sys.argv[i] + if arg.startswith("#"): + arg = arg[1:] + if (mode is None) and (arg in modes.keys()): + # don't set the command to list unless the enclosing case is + # true. If a label was specified, paging is handled in the + # other case. + if arg == "page": + mode = "list" + else: + mode = arg + else: + try: + i = int(arg) + if prev_arg == "page": + repo.page = i + else: + match_number = i + except ValueError: + if (mode is None) and (modes.get(arg) is not None): + mode = arg + else: + if arg != "page": + # print("* adding criteria: {}".format(arg)) + mode = "list" + match_all_labels.append(arg) + prev_arg = arg + if mode is None: + if len(match_all_labels) > 1: + mode = "list" + if match_number is not None: + mode = "issue" + valid_modes = ["issue"] + for k, v in modes.items(): + valid_modes.append(k) + + if mode is None: + print() + print() + usage() + print() + print() + sys.exit(0) + elif mode not in valid_modes: + print() + print() + usage() + print() + print(mode + " is not a valid command.") + print() + print() + sys.exit(0) print("") - print("") - return True + # print("Loading...") + + # TODO: get labels another way, and make this conditional: + # if mode == "list": + repo.load_issues() + if repo.issues is None: + print("There were no issues.") + sys.exit(0) + + match_all_labels_lower = [] + p = repo.log_prefix + for s in match_all_labels: + # print(p+"appending" + # " {} to match_all_labels_lower.".format(s.lower())) + match_all_labels_lower.append(s.lower()) + + total_count = len(repo.issues) + match = repo.get_match( + mode, + match_number=match_number, + match_all_labels_lower=match_all_labels_lower, + ) + matching_issue = match['issue'] + + if matching_issue is not None: + repo.show_issue(matching_issue) + + if mode == "labels": + # print("Labels:") + # print("") + for label_s in repo.labels: + print(label_s) + print("") + print("The repo has {} label(s).".format(len(repo.labels))) + print("") + if total_count >= repo.page_size: + print("The maximum issue count per page was reached.") + next_page = 2 + if repo.page is not None: + next_page = repo.page + 1 + print(" ./" + me + " labels page " + str(next_page)) + print("to see labels on additional pages.") -if matching_issue is not None: - show_issue(matching_issue) + elif mode == "list": + print() + if len(match_all_labels) > 0: + print("{} issue(s) matched {}".format( + match['count'], + " + ".join("'{}'".format(s) for s in match_all_labels) + )) + if total_count >= repo.page_size: + print("{} searched, which is the maximum number" + " per page.".format(total_count)) + next_page = 2 + if repo.page is not None: + next_page = repo.page + 1 + print(" ./" + me + " " + " ".join(match_all_labels) + + " page " + str(next_page)) + print("to see additional pages.") -if cmd == "labels": - # print("Labels:") - # print("") - for label_s in labels: - print(label_s) - print("") - print("The repo has {} label(s).".format(len(labels))) - print("") - if total_count >= page_size: - print("The maximum issue count per page was reached.") - next_page = 2 - if page is not None: - next_page = page + 1 - print(" ./" + me + " labels page " + str(next_page)) - print("to see labels on additional pages.") - - -elif cmd == "list": - print() - if len(match_all_labels) > 0: - print("{} issue(s) matched {}".format( - match_count, - " + ".join("'{}'".format(s) for s in match_all_labels) - )) - if total_count >= page_size: - print("{} searched, which is the maximum number" - " per page.".format(total_count)) - next_page = 2 - if page is not None: - next_page = page + 1 - print(" ./" + me + " " + " ".join(match_all_labels) - + " page " + str(next_page)) - print("to see additional pages.") - - else: - if page is not None: - print("{} issue(s) are showing for page" - " {}.".format(match_count, page)) else: - print("{} issue(s) are showing.".format(match_count)) - if total_count >= page_size: - print("That is the maximum number per page. Type") - next_page = 2 - if page is not None: - next_page = page + 1 - print(" ./" + me + " page " + str(next_page)) - print("to see additional pages.") - if match_count > 0: - print() - print() - print("To view details, type") - print(" ./" + me) - print("followed by a number.") -print("") + if repo.page is not None: + print("{} issue(s) are showing for page" + " {}.".format(match['count'], repo.page)) + else: + print("{} issue(s) are showing.".format(match['count'])) + if total_count >= repo.page_size: + print("That is the maximum number per page. Type") + next_page = 2 + if repo.page is not None: + next_page = repo.page + 1 + print(" ./" + me + " page " + str(next_page)) + print("to see additional pages.") + if match['count'] > 0: + print() + print() + print("To view details, type") + print(" ./" + me) + print("followed by a number.") + print("") + + +if __name__ == "__main__": + main()